Peterfei

上主是我的牧者,我实在一无所缺


  • 首页

  • 归档

  • 标签

大数据-实时计算Flink之FlinkSQL

发表于 2022-09-17   |  

背景

大数据处理技术现今已广泛应用于各个行业,为业务解决海量存储和海量分析的需求。但数据量的爆发式增长,对数据处理能力提出了更大的挑战,同时对时效性也提出了更高的要求。业务通常已不再满足滞后的分析结果,希望看到更实时的数据,从而在第一时间做出判断和决策。典型的场景如电商大促和金融风控等,基于延迟数据的分析结果已经失去了价值。

如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析,成了企业构建大数据生态的一个重要方向。如何快速、一致、原子性地在数据湖存储上构建起 Data Pipeline,成了亟待解决的问题。

下载flink-sql-connector-mysql-cdc, flink-sql-connector-kafka ,flink-sql-connector-hive到 /usr/lib/flink/lib/

1
2
3
4
5
6
7
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.2/flink-sql-connector-kafka_2.12-1.14.2.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.2/flink-sql-connector-hive-3.1.2_2.12-1.14.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.2/flink-connector-jdbc_2.12-1.14.2.jar

Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程。Flink1.11的CDC connector主要包括:MySQL CDC和Postgres CDC,同时对Kafka的Connector支持canal-json和debezium-json以及changelog-json的format。

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。

特点

  • 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

使用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具体如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用过程中的注意点

使用MySQL CDC的注意点

如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

之前, mysql 中需加入配置

1
2
3
4
server_id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

重启mysql.

mysql-cdc的操作实践

创建MySQL数据源表

在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`operate_time` datetime DEFAULT NULL COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info`
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail`
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail`
VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行下面命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 创建订单信息表
CREATE TABLE order_info(
id BIGINT,
user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_info'
);

在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

img

在SQL CLI中创建订单详情表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE order_detail(
id BIGINT,
order_id BIGINT,
sku_id BIGINT,
sku_name STRING,
sku_num BIGINT,
order_price DECIMAL(10, 5),
create_time TIMESTAMP(0)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_detail'
);

查询结果如下:

img

执行JOIN操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM order_info
WHERE
order_status = '2'-- 已支付
) oi
JOIN
(
SELECT *
FROM order_detail
) od
ON oi.id = od.order_id;

Headscale 之部署私有 DERP 中继服务器

发表于 2022-09-05   |  

STUN 是什么

Tailscale 的终极目标是让两台处于网络上的任何位置的机器建立点对点连接(直连),但现实世界是复杂的,大部份情况下机器都位于 NAT 和防火墙后面,这时候就需要通过打洞来实现直连,也就是 NAT 穿透。

NAT 按照 NAT 映射行为和有状态防火墙行为可以分为多种类型,但对于 NAT 穿透来说根本不需要关心这么多类型,只需要看 NAT 或者有状态防火墙是否会严格检查目标 Endpoint,根据这个因素,可以将 NAT 分为 Easy NAT 和 Hard NAT。

  • Easy NAT 及其变种称为 “Endpoint-Independent Mapping” (EIM,终点无关的映射)
    这里的 Endpoint 指的是目标 Endpoint,也就是说,有状态防火墙只要看到有客户端自己发起的出向包,就会允许相应的入向包进入,不管这个入向包是谁发进来的都可以。
  • hard NAT 以及变种称为 “Endpoint-Dependent Mapping”(EDM,终点相关的映射)
    这种 NAT 会针对每个目标 Endpoint 来生成一条相应的映射关系。 在这样的设备上,如果客户端向某个目标 Endpoint 发起了出向包,假设客户端的公网 IP 是 2.2.2.2,那么有状态防火墙就会打开一个端口,假设是 4242。那么只有来自该目标 Endpoint 的入向包才允许通过 2.2.2.2:4242,其他客户端一律不允许。这种 NAT 更加严格,所以叫 Hard NAT。

对于 Easy NAT,我们只需要提供一个第三方的服务,它能够告诉客户端“它看到的客户端的公网 ip:port 是什么”,然后将这个信息以某种方式告诉通信对端(peer),后者就知道该和哪个地址建连了!这种服务就叫 STUN (Session Traversal Utilities for NAT,NAT会话穿越应用程序)。它的工作流程如下图所示:

  • 笔记本向 STUN 服务器发送一个请求:“从你的角度看,我的地址什么?”
  • STUN 服务器返回一个响应:“我看到你的 UDP 包是从这个地址来的:ip:port”。

中继是什么

对于 Hard NAT 来说,STUN 就不好使了,即使 STUN 拿到了客户端的公网 ip:port 告诉通信对端也于事无补,因为防火墙是和 STUN 通信才打开的缺口,这个缺口只允许 STUN 的入向包进入,其他通信对端知道了这个缺口也进不来。通常企业级 NAT 都属于 Hard NAT。

这种情况下打洞是不可能了,但也不能就此放弃,可以选择一种折衷的方式:创建一个中继服务器(relay server),客户端与中继服务器进行通信,中继服务器再将包中继(relay)给通信对端。

至于中继的性能,那要看具体情况了:

  • 如果能直连,那显然没必要用中继方式;
  • 但如果无法直连,而中继路径又非常接近双方直连的真实路径,并且带宽足够大,那中继方式并不会明显降低通信质量。延迟肯定会增加一点,带宽会占用一些,但相比完全连接不上,还是可以接受的。

事实上对于大部分网络而言,Tailscale 都可以通过各种黑科技打洞成功,只有极少数情况下才会选择中继,中继只是一种 fallback 机制。

中继协议简介

中继协议有多种实现方式。

TURN

TURN 即 Traversal Using Relays around NAT,这是一种经典的中继实现方式,核心理念是:

  • 用户(人)先去公网上的 TURN 服务器认证,成功后后者会告诉你:“我已经为你分配了 ip:port,接下来将为你中继流量”,
  • 然后将这个 ip:port 地址告诉对方,让它去连接这个地址,接下去就是非常简单的客户端/服务器通信模型了。

与 STUN 不同,这种协议没有真正的交互性,不是很好用,因此 Tailscale 并没有采用 TURN 作为中继协议。

DERP

DERP 即 Detoured Encrypted Routing Protocol,这是 Tailscale 自研的一个协议:

  • 它是一个通用目的包中继协议,运行在 HTTP 之上,而大部分网络都是允许 HTTP 通信的。
  • 它根据目的公钥(destination’s public key)来中继加密的流量(encrypted payloads)。

自建私有 DERP server

Tailscale 的私钥只会保存在当前节点,因此 DERP server 无法解密流量,它只能和互联网上的其他路由器一样,呆呆地将加密的流量从一个节点转发到另一个节点,只不过 DERP 使用了一个稍微高级一点的协议来防止滥用。

Tailscale 开源了 DERP 服务器的代码,如果你感兴趣,可以阅读 DERP 的源代码。


Tailscale 官方内置了很多 DERP 服务器,分步在全球各地,惟独不包含中国大陆,原因你懂得。这就导致了一旦流量通过 DERP 服务器进行中继,延时就会非常高。而且官方提供的 DERP 服务器是万人骑,存在安全隐患。

为了实现低延迟、高安全性,我们可以参考 Tailscale 官方文档自建私有的 DERP 服务器。有两种部署模式,一种是基于域名,另外一种不需要域名,可以直接使用 IP,不过需要一点黑科技。我们先来看最简单的使用域名的方案。

还需要创建自签名证书,可以通过脚本来创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# build_cert.sh
#!/bin/bash
CERT_HOST=$1
CERT_DIR=$2
CONF_FILE=$3
echo "[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = XX
stateOrProvinceName = N/A
localityName = N/A
organizationName = Self-signed certificate
commonName = $CERT_HOST: Self-signed certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = $CERT_HOST
" > "$CONF_FILE"
mkdir -p "$CERT_DIR"
openssl req -x509 -nodes -days 730 -newkey rsa:2048 -keyout "$CERT_DIR/$CERT_HOST.key" -out "$CERT_DIR/$CERT_HOST.crt" -config "$CONF_FILE"

重新编写 Dockerfile,将 derper 的域名设置为 127.0.0.1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
FROM golang:latest AS builder
WORKDIR /app
# ========= CONFIG =========
# - download links
ENV MODIFIED_DERPER_GIT=https://github.com/yangchuansheng/ip_derper.git
ENV BRANCH=ip_derper
# ==========================
# build modified derper
RUN git clone -b $BRANCH $MODIFIED_DERPER_GIT tailscale --depth 1 && \
cd /app/tailscale/cmd/derper && \
/usr/local/go/bin/go build -ldflags "-s -w" -o /app/derper && \
cd /app && \
rm -rf /app/tailscale
FROM ubuntu:20.04
WORKDIR /app
# ========= CONFIG =========
# - derper args
ENV DERP_HOST=127.0.0.1
ENV DERP_CERTS=/app/certs/
ENV DERP_STUN true
ENV DERP_VERIFY_CLIENTS false
# ==========================
# apt
RUN apt-get update && \
apt-get install -y openssl curl
COPY build_cert.sh /app/
COPY --from=builder /app/derper /app/derper
# build self-signed certs && start derper
CMD bash /app/build_cert.sh $DERP_HOST $DERP_CERTS /app/san.conf && \
/app/derper --hostname=$DERP_HOST \
--certmode=manual \
--certdir=$DERP_CERTS \
--stun=$DERP_STUN \
--verify-clients=$DERP_VERIFY_CLIENTS

构建好镜像后,就可以在你想部署 derper 的主机上直接通过该镜像启动 derper 容器了,命令如下:

1
🐳 → docker run --restart always --net host --name derper -p 12345:12345 -p 3478:3478/udp -e DERP_ADDR=:12345 -d ghcr.io/yangchuansheng/ip_derper

Headscale 的本地 YAML 文件目前还不支持这个配置项,所以没办法,咱只能使用在线 URL 了。JSON 配置内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@jetlink headscale]# cat derp.json
{
"Regions": {
"901": {
"RegionID": 901,
"RegionCode": "ali-sh",
"RegionName": "Aliyun Shanghai",
"Nodes": [
{
"Name": "901a",
"RegionID": 901,
"DERPPort": 12345,
"HostName": "xx.xx.xx.118",
"IPv4": "xx.xx.xx.118",
"InsecureForTests": true
}
]
}
}
}

配置解析:

  • HostName 直接填 derper 的公网 IP,即和 IPv4 的值相同。
  • InsecureForTests 一定要设置为 true,以跳过域名验证。

你需要把这个 JSON 文件变成 Headscale 服务器可以访问的 URL,比如在 Headscale 主机上搭个 Nginx,或者上传到对象存储(比如阿里云 OSS)

接下来还需要修改 Headscale 的配置文件,引用上面的自定义 DERP 的 URL。需要修改的配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# /etc/headscale/config.yaml
derp:
# List of externally available DERP maps encoded in JSON
urls:
# - https://controlplane.tailscale.com/derpmap/default
- https://xxxxx/derp.json
# Locally available DERP map files encoded in YAML
#
# This option is mostly interesting for people hosting
# their own DERP servers:
# https://tailscale.com/kb/1118/custom-derp-servers/
#
# paths:
# - /etc/headscale/derp-example.yaml
paths:
- /etc/headscale/derp.yaml
# If enabled, a worker will be set up to periodically
# refresh the given sources and update the derpmap
# will be set up.
auto_update_enabled: true
# How often should we check for DERP updates?
update_frequency: 24h

修改完配置后,重启 headscale 服务:

1
$ systemctl restart headscale

在 Tailscale 客户端上使用以下命令查看目前可以使用的 DERP 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ tailscale netcheck
Report:
* UDP: true
* IPv4: yes, 192.168.100.1:49656
* IPv6: no
* MappingVariesByDestIP: true
* HairPinning: false
* PortMapping: UPnP
* Nearest DERP: Home Hangzhou
* DERP latency:
- home: 9.7ms (Home Hangzhou)
- hs: 25.2ms (Huawei Shanghai)
- thk: 43.5ms (Tencent Hongkong)

再次查看与通信对端的连接方式:

1
2
3
4
$ tailscale status
coredns default linux active; direct xxxx:45986; offline, tx 131012 rx 196020
oneplus-8t default android active; relay "home"; offline, tx 211900 rx 22780
openwrt default linux active; direct 192.168.100.254:41641; offline, tx 189868 rx 4074772

可以看到这一次 Tailscale 自动选择了一个线路最优的国内的 DERP 服务器作为中继,可以测试一下延迟:

1
2
3
4
$ tailscale ping 10.1.0.8
pong from oneplus-8t (10.1.0.8) via DERP(home) in 30ms
pong from oneplus-8t (10.1.0.8) via DERP(home) in 45ms
pong from oneplus-8t (10.1.0.8) via DERP(home) in 30ms

完美!这里的 home 当然是我的家庭宽带,部署方式与上面所说的国内云主机类似,你需要额外开启公网的端口映射(12345/tcp, 3478/udp)。还有一点需要注意的是配置内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"Regions": {
"901": {
"RegionID": 901,
"RegionCode": "ali-sh",
"RegionName": "Aliyun Shanghai",
"Nodes": [
{
"Name": "901a",
"RegionID": 901,
"DERPPort": 443,
"HostName": "xxxx",
"IPv4": "xxxx",
"InsecureForTests": true
}
]
},
"902": {
"RegionID": 902,
"RegionCode": "home",
"RegionName": "Home Hangzhou",
"Nodes": [
{
"Name": "902a",
"RegionID": 902,
"DERPPort": 12345,
"HostName": "xxxx",
"InsecureForTests": true
}
]
}
}
}

与国内云主机相比,家庭宽带的配置有两点不同:

  • 需要删除 IPv4 配置项。因为家用宽带的公网 IP 是动态变化的,所以你需要使用 DDNS 来动态解析公网 IP。
  • HostName 最好填域名,因为你的公网 IP 是动态变化的,没法填写 IP,除非你不停地修改配置文件。填域名也没关系啦,反正不会验证域名的,也不用关心证书的事情,只要域名能解析到你的公网 IP 即可。

防止 DERP 被白嫖

默认情况下 DERP 服务器是可以被白嫖的,只要别人知道了你的 DERP 服务器的地址和端口,就可以为他所用。如果你的服务器是个小水管,用的人多了可能会把你撑爆,因此我们需要修改配置来防止被白嫖。

特别声明:只有使用域名的方式才可以通过认证防止被白嫖,使用纯 IP 的方式无法防白嫖,你只能小心翼翼地隐藏好你的 IP 和端口,不能让别人知道。

只需要做两件事情:

1、在 DERP 服务器上安装 Tailscale。

第一步需要在 DERP 服务所在的主机上安装 Tailscale 客户端,启动 tailscaled 进程。

2、derper 启动时加上参数 --verify-clients。

本文推荐的是通过容器启动, Dockerfile 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FROM golang:latest AS builder
LABEL org.opencontainers.image.source https://github.com/yangchuansheng/docker-image
WORKDIR /app
# https://tailscale.com/kb/1118/custom-derp-servers/
RUN go install tailscale.com/cmd/derper@main
FROM ubuntu
WORKDIR /app
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends apt-utils && \
apt-get install -y ca-certificates && \
mkdir /app/certs
ENV DERP_DOMAIN your-hostname.com
ENV DERP_CERT_MODE letsencrypt
ENV DERP_CERT_DIR /app/certs
ENV DERP_ADDR :443
ENV DERP_STUN true
ENV DERP_HTTP_PORT 80
ENV DERP_VERIFY_CLIENTS false
COPY --from=builder /go/bin/derper .
CMD /app/derper --hostname=$DERP_DOMAIN \
--certmode=$DERP_CERT_MODE \
--certdir=$DERP_CERT_DIR \
--a=$DERP_ADDR \
--stun=$DERP_STUN \
--http-port=$DERP_HTTP_PORT \
--verify-clients=$DERP_VERIFY_CLIENTS

默认情况下 --verify-clients 参数设置的是 false。我们不需要对 Dockerfile 内容做任何改动,只需在容器启动时加上环境变量即可,将之前的启动命令修改一下:

1
2
3
4
5
6
7
8
🐳 → docker run --restart always \
--name derper -p 12345:12345 -p 3478:3478/udp \
-v /root/.acme.sh/xxxx/:/app/certs \
-e DERP_CERT_MODE=manual \
-e DERP_ADDR=:12345 \
-e DERP_DOMAIN=xxxx \
-e DERP_VERIFY_CLIENTS=true \
-d ghcr.io/yangchuansheng/derper:latest

这样就大功告成了,别人即使知道了你的 DERP 服务器地址也无法使用,但还是要说明一点,即便如此,你也应该尽量不让别人知道你的服务器地址,防止别人有可趁之机。

总结

本文给大家介绍了 STUN 对于辅助 NAT 穿透的意义,科普了几种常见的中继协议,包含 Tailscale 自研的 DERP 协议。最后手把手教大家如何自建私有的 DERP 服务器,并让 Tailscale 使用我们自建的 DERP 服务器。

使用Tailscale搭建VPN专线

发表于 2022-09-05   |  

Tailscale 是什么

Tailscale 是一种基于 WireGuard 的虚拟组网工具.

与 OpenVPN 之流相比还是能甩好几十条街的,Tailscale 虽然在性能上做了些许取舍,但在功能和易用性上绝对是完爆其他工具:

  1. 开箱即用
  • 无需配置防火墙
  • 没有额外的配置
  1. 高安全性/私密性
  • 自动密钥轮换
  • 点对点连接
  • 支持用户审查端到端的访问记录
  1. 在原有的 ICE、STUN 等 UDP 协议外,实现了 DERP TCP 协议来实现 NAT 穿透
  2. 基于公网的控制服务器下发 ACL 和配置,实现节点动态更新
  3. 通过第三方(如 Google) SSO 服务生成用户和私钥,实现身份认证

简而言之,我们可以将 Tailscale 看成是更为易用、功能更完善的 WireGuard。

Headscale 是什么

Tailscale 的控制服务器是不开源的,而且对免费用户有诸多限制,这是人家的摇钱树,可以理解。好在目前有一款开源的实现叫 Headscale,这也是唯一的一款,希望能发展壮大。

Headscale 由欧洲航天局的 Juan Font 使用 Go 语言开发,在 BSD 许可下发布,实现了 Tailscale 控制服务器的所有主要功能,可以部署在企业内部,没有任何设备数量的限制,且所有的网络流量都由自己控制

Headscale 部署

Headscale 部署很简单,推荐直接在 Linux 主机上安装

首先需要到其 GitHub 仓库的 Release 页面下载最新版的二进制文件。

1
2
3
4
$ wget --output-document=/usr/local/bin/headscale \
https://github.com/juanfont/headscale/releases/download/v<HEADSCALE VERSION>/headscale_<HEADSCALE VERSION>_linux_<ARCH>
$ chmod +x /usr/local/bin/headscale

创建配置目录:

1
$ mkdir -p /etc/headscale

创建目录用来存储数据与证书:

1
$ mkdir -p /var/lib/headscale

创建空的 SQLite 数据库文件:

1
$ touch /var/lib/headscale/db.sqlite

创建 Headscale 配置文件:

1
$ wget https://github.com/juanfont/headscale/raw/main/config-example.yaml -O /etc/headscale/config.yaml
  • 修改配置文件,将 server_url 改为公网 IP 或域名。如果是国内服务器,域名必须要备案。我的域名无法备案,所以我就直接用公网 IP 了。

  • 如果暂时用不到 DNS 功能,可以先将 magic_dns 设为 false。

  • server_url 设置为 http://<PUBLIC_IP>:8080,将 <PUBLIC_IP> 替换为公网 IP 或者域名。

  • 可自定义私有网段,也可同时开启 IPv4 和 IPv6:

    1
    2
    3
    ip_prefixes:
    # - fd7a:115c:a1e0::/48
    - 10.1.0.0/16

创建 SystemD service 配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# /etc/systemd/system/headscale.service
[Unit]
Description=headscale controller
After=syslog.target
After=network.target
[Service]
Type=simple
User=headscale
Group=headscale
ExecStart=/usr/local/bin/headscale serve
Restart=always
RestartSec=5
# Optional security enhancements
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/var/lib/headscale /var/run/headscale
AmbientCapabilities=CAP_NET_BIND_SERVICE
RuntimeDirectory=headscale
[Install]
WantedBy=multi-user.target

创建 headscale 用户:

1
$ useradd headscale -d /home/headscale -m

修改 /var/lib/headscale 目录的 owner:

1
$ chown -R headscale:headscale /var/lib/headscale

修改配置文件中的 unix_socket:

1
unix_socket: /var/run/headscale/headscale.sock

Reload SystemD 以加载新的配置文件:

1
$ systemctl daemon-reload

启动 Headscale 服务并设置开机自启:

1
$ systemctl enable --now headscale

查看运行状态:

1
$ systemctl status headscale

查看占用端口:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ ss -tulnp|grep headscale
tcp LISTEN 0 1024 [::]:9090 [::]:* users:(("headscale",pi
d=10899,fd=13))
tcp LISTEN 0 1024 [::]:50443 [::]:* users:(("headscale",pi
d=10899,fd=10))
tcp LISTEN 0 1024 [::]:8080 [::]:* users:(("headscale",pi
d=10899,fd=12))

ailscale 中有一个概念叫 tailnet,你可以理解成租户,租户与租户之间是相互隔离的,具体看参考 Tailscale 的官方文档: What is a tailnet。Headscale 也有类似的实现叫 namespace,即命名空间。我们需要先创建一个 namespace,以便后续客户端接入,例如

1
headscale namespaces create vpn

查看命名空间:

1
2
3
4
5
$ headscale namespaces list
ID | Name | Created
1 | vpn | 2022-03-09 06:12:06

Tailscale 客户端接入

Linux

Tailscale 官方提供了各种 Linux 发行版的软件包,但国内的网络你懂得,软件源根本用不了。好在官方还提供了 静态编译的二进制文件,我们可以直接下载。例如:

1
$ wget https://pkgs.tailscale.com/stable/tailscale_1.22.2_amd64.tgz

解压:

1
2
3
4
5
6
7
$ tar zxvf tailscale_1.22.2_amd64.tgz
x tailscale_1.22.2_amd64/
x tailscale_1.22.2_amd64/tailscale
x tailscale_1.22.2_amd64/tailscaled
x tailscale_1.22.2_amd64/systemd/
x tailscale_1.22.2_amd64/systemd/tailscaled.defaults
x tailscale_1.22.2_amd64/systemd/tailscaled.service

将二进制文件复制到官方软件包默认的路径下:

1
2
$ cp tailscale_1.22.2_amd64/tailscaled /usr/sbin/tailscaled
$ cp tailscale_1.22.2_amd64/tailscale /usr/bin/tailscale

将 systemD service 配置文件复制到系统路径下:

1
$ cp tailscale_1.22.2_amd64/systemd/tailscaled.service /lib/systemd/system/tailscaled.service

将环境变量配置文件复制到系统路径下:

1
$ cp tailscale_1.22.2_amd64/systemd/tailscaled.defaults /etc/default/tailscaled

启动 tailscaled.service 并设置开机自启:

1
$ systemctl enable --now tailscaled

查看服务状态:

1
$ systemctl status tailscaled

Tailscale 接入 Headscale:

1
2
# 将 <HEADSCALE_PUB_IP> 换成你的 Headscale 公网 IP 或域名
$ tailscale up --login-server=http://xx.xx.xx.xx:8080 --accept-dns=false --advertise-routes=192.168.xx.0/24

这里推荐将 DNS 功能关闭,因为它会覆盖系统的默认 DNS。如果你对 DNS 有需求,可自己研究官方文档,这里不再赘述。

执行完上面的命令后,会出现下面的信息:

1
2
3
4
5
To authenticate, visit:
http://xxxxxx:8080/register?key=905cf165204800247fbd33989dbc22be95c987286c45aac303393704
1150d846

在浏览器中打开该链接,就会出现如下的界面

将其中的命令复制粘贴到 headscale 所在机器的终端中,并将 NAMESPACE 替换为前面所创建的 namespace。

1
2
$ headscale -n vpn nodes register --key 905cf165204800247fbd33989dbc22be95c987286c45aac3033937041150d846
Machine register

注册成功,查看注册的节点:

1
2
3
4
5
6
7
8
9
$ headscale nodes list
ID | Name | NodeKey | Namespace | IP addresses | Ephemeral | Last seen | Onlin
e | Expired
1 | coredns | [Ew3RB] | vpn | 10.1.0.1 | false | 2022-03-20 09:08:58 | onlin
e | no

回到 Tailscale 客户端所在的 Linux 主机,可以看到 Tailscale 会自动创建相关的路由表和 iptables 规则。路由表可通过以下命令查看

1
ip route show table 52

macOS

macOS 有 3 种安装方法:

  • 直接通过应用商店安装,地址: https://apps.apple.com/ca/app/tailscale/id1475387142。前提是你**需要一个美区 ID**。。。
  • 下载 安装包直接安装,绕过应用商店。
  • 安装开源的命令行工具 tailscale 和 tailscaled。相关链接: https://github.com/tailscale/tailscale/wiki/Tailscaled-on-macOS。

这三种安装包的核心数据包处理代码是相同的,唯一的区别在于在于打包方式以及与系统的交互方式

安装完 GUI 版应用后还需要做一些骚操作,才能让 Tailscale 使用 Headscale 作为控制服务器。当然,Headscale 已经给我们提供了详细的操作步骤,你只需要在浏览器中打开 URL:http://<HEADSCALE_PUB_IP>:8080/apple,便会出现如下的界面:

非应用商店版本的 macOS 客户端需要将 io.tailscale.ipn.macos 替换为 io.tailscale.ipn.macsys。即:defaults write io.tailscale.ipn.macsys ControlURL http://<HEADSCALE_PUB_IP>:8080

修改完成后重启 Tailscale 客户端,在 macOS 顶部状态栏中找到 Tailscale 并点击,然后再点击 Log in

然后立马就会跳转到浏览器并打开一个页面。

接下来与之前 Linux 客户端相同,回到 Headscale 所在的机器执行浏览器中的命令即可,注册成功

回到 Headscale 所在主机,查看注册的节点:

1
2
3
4
5
6
7
8
9
10
$ headscale nodes list
ID | Name | NodeKey | Namespace | IP addresses | Ephemeral | Last seen | Onlin
e | Expired
1 | coredns | [Ew3RB] | default | 10.1.0.1 | false | 2022-03-20 09:08:58 | onlin
e | no
2 | carsondemacbook-pro | [k7bzX] | default | 10.1.0.2 | false | 2022-03-20 09:48:30 | online | no

回到 macOS,测试是否能 ping 通对端节点:

1
2
3
4
5
6
7
8
$ ping -c 2 10.1.0.1
PING 10.1.0.1 (10.1.0.1): 56 data bytes
64 bytes from 10.1.0.1: icmp_seq=0 ttl=64 time=37.025 ms
64 bytes from 10.1.0.1: icmp_seq=1 ttl=64 time=38.181 ms
--- 10.1.0.1 ping statistics ---
2 packets transmitted, 2 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 37.025/37.603/38.181/0.578 ms

也可以使用 Tailscale CLI 来测试:

1
2
$ /Applications/Tailscale.app/Contents/MacOS/Tailscale ping 10.1.0.1
pong from coredns (10.1.0.1) via xxxx:41641 in 36ms

Windows

Windows Tailscale 客户端想要使用 Headscale 作为控制服务器,只需在浏览器中打开 URL:http://<HEADSCALE_PUB_IP>:8080/windows,便会出现如下的界面

按照其中的步骤操作即可。

通过 Pre-Authkeys 接入

前面的接入方法都需要服务端同意,步骤比较烦琐,其实还有更简单的方法,可以直接接入,不需要服务端同意。

首先在服务端生成 pre-authkey 的 token,有效期可以设置为 24 小时:

1
$ headscale preauthkeys create -e 24h -n default

查看已经生成的 key:

1
2
3
$ headscale -n default preauthkeys list
ID | Key | Reusable | Ephemeral | Used | Expiration | Created
1 | 57e419c40e30b0dxxxxxxxf15562c18a8c6xxxx28ae76f57 | false | false | false | 2022-05-30 07:14:17 | 2022-05-29 07:14:17

现在新节点就可以无需服务端同意直接接入了:

1
$ tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --authkey $KEY

打通局域网

到目前为止我们只是打造了一个点对点的 Mesh 网络,各个节点之间都可以通过 WireGuard 的私有网络 IP 进行直连。但我们可以更大胆一点,还记得我在文章开头提到的访问家庭内网的资源吗?我们可以通过适当的配置让每个节点都能访问其他节点的局域网 IP。这个使用场景就比较多了,你可以直接访问家庭内网的 NAS,或者内网的任何一个服务,更高级的玩家可以使用这个方法来访问云上 Kubernetes 集群的 Pod IP 和 Service IP。

假设你的家庭内网有一台 Linux 主机(比如 OpenWrt)安装了 Tailscale 客户端,我们希望其他 Tailscale 客户端可以直接通过家中的局域网 IP(例如 192.168.100.0/24) 访问家庭内网的任何一台设备。

配置方法很简单,首先需要设置 IPv4 与 IPv6 路由转发:

1
2
3
$ echo 'net.ipv4.ip_forward = 1' | tee /etc/sysctl.d/ipforwarding.conf
$ echo 'net.ipv6.conf.all.forwarding = 1' | tee -a /etc/sysctl.d/ipforwarding.conf
$ sysctl -p /etc/sysctl.d/ipforwarding.conf

客户端修改注册节点的命令,在原来命令的基础上加上参数 --advertise-routes=192.168.100.0/24。

1
$ tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --advertise-routes=192.168.100.0/24

在 Headscale 端查看路由,可以看到相关路由是关闭的。

1
2
3
4
5
6
7
8
9
10
11
$ headscale nodes list|grep openwrt
6 | openwrt | [7LdVc] | default | 10.1.0.6 | false | 2022-03-20 15:50:46 | onlin
e | no
$ headscale routes list -i 6
Route | Enabled
192.168.100.0/24 | false

开启路由:

1
2
3
4
5
$ headscale routes enable -i 6 -r "192.168.100.0/24"
Route | Enabled
192.168.100.0/24 | true

其他节点查看路由结果:

1
2
$ ip route show table 52|grep "192.168.100.0/24"
192.168.100.0/24 dev tailscale0

现在你在任何一个 Tailscale 客户端所在的节点都可以 ping 通家庭内网的机器.


Elasticsearch 之Data stream

发表于 2022-08-26   |  
  • 数据流
  • Elasticsearch

Data stream 的概念

时序性数据

时间序列数据( time series data )是在不同时间上收集到的数据,用于所描述现象随时间变化的情况。这类数据反映了某一事物、现象等,随时间的变化状态或程度。

总的来说,这类数据主要基于时间特性明显,随着时间的流逝,往往过去时间的数据没有现在时间的重要或者敏感。

对于 Elastisearch 处理时序性数据,有人总结了主要有以下特点:

  • 由时间戳 + 数据组成。基于时间的事件,可以是服务器日志或者社交媒体流。
  • 通常搜索最近事件,旧文件变得不太重要。
  • 索引的使用主要基于时间,但是数据并不一定随着时间均衡分布。
  • 时序性数据一旦存入后很少修改。
  • 时序性数据随着时间的增加,数据量会很大。

Elastisearch 在时序性数据的使用中,往往会有以下的缺点:

  • 索引随着时间增加而数目较多。
  • 索引大小无法均衡。
  • 管理索引成本较高,需要维护 merge 合并删除等一系列任务。
  • 节点资源与冷热数据分布不匹配。

在这样的一个场景下,数据流 Data stream 应运而生。

Data stream (数据流)是 Elastic Stack 7.9 的一个新的功能。Data stream 可以跨多个索引存储只追加时序性数据,同时为查询写入等请求提供唯一的一个命名资源。Data stream 非常适合日志,事件,指标以及其他持续生成的数据。

简单来说,Data stream 根据模板生成存储数据的后备索引,然后自动将搜索或者索引请求路由到存储流数据的后备索引。而这些后备索引则根据索引生命周期管理( ILM )来自动管理。

Data stream 的组成

数据流在 Elasticsearch 集群中由一个或多个隐藏的、自动生成的后备索引组成。

在实际的 Elasticsearch 操作中,数据流依靠索引模板来设定数据流实体的后备索引。

  • 模板包含用于配置流的后备索引的映射和设置。
  • 同一个索引模板可用于多个数据流。
  • 不能删除数据流正在使用的索引模板。

每个索引到数据流的文档,必须包含一个 @timestamp 字段,映射为 date 或 date_nanos 字段类型。如果索引模板没有为 @timestamp 字段指定映射,Elasticsearch 将 @timestamp 映射为带有默认选项的日期字段。

Data stream 的读请求主要如下图,数据流自动将请求路由到其所有后备索引。

而对于写请求,数据流则将该请求自动转发给最新的后备索引。

Data stream 的特性

生成

每个 Data stream 的后备索引都有一个 generation 数,一个六位数,零填充的整数,从 000001 开始,用作该流的 rollover 的计数。

后备索引名主要依照以下格式:

.ds–

Generation 越大,后备索引包含的数据越新。 例如,web-server-logs 数据流最新的 generation 为 34。该流的最新后备索引名为 .ds-web-server-logs-000034。

注意:某些操作(例如 shrink 或 restore )可以更改后备索引的名称。 这些名称更改不会从其数据流中删除后备索引。

Rollover

在 Data stream 的使用中,rollover 是必不可少的条件。

创建数据流时,Elasticsearch 会自动为该 Data stream 根据 template 创建一个后备索引。 该索引还充当流的第一个写入索引。当满足一定条件时, rollover 会创建一个新的后备索引,该后备索引将成为 Data stream 的新写入索引。

当然 rollover 的条件设置主要依靠 ILM。 如果需要,你还可以手动将数据 rollover 。

追加

由于时序性数据的特征,Data stream 的设计场景中,数据是只追加的,极少需要修改删除。如果实际需要修改删除,则可以考虑以下操作:

  • 对于数据流只能通过 update by query 或者 delete by query 操作,不能进行 update 或者 delete 文档。
  • 需要 delete 或者 update 文档,则直接对后备索引操作。
  • 需要经常删除或者修改文档的,请使用索引别名或者索引模板,不要对 Data stream 操作。

Data stream 的使用

创建索引生命周期管理策略 ILM

索引生命周期管理策略 ILM 的主要配置细节见索引周期管理一章,此处主要做 hot 和 delete 阶段的设置,用于 rollover 的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PUT /_ilm/policy/my-data-stream-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "10mb"
}
}
}
}
}
}

创建索引模板

索引模板是后备索引设置,以及 mapping 的主要配置来源,此处不展开延伸。主要设置 Data stream 相关的部分。

相关命令:

1
2
3
4
5
6
7
8
9
10
11
PUT /_index_template/my-data-stream-template
{
"index_patterns": [ "my-data-stream*" ],
"data_stream": { },
"priority": 200,
"template": {
"settings": {
"index.lifecycle.name": "my-data-stream-policy"
}
}
}

注意:

  1. 定义 data_stream 为一个空的 object ,这是必要的。
  2. Template 中使用了上一步创建的 ILM 策略 my-data-stream-policy。

此外,还需要注意两点:

  1. Elasticsearch 有一些内置索引模板如 metric-- 和 logs-- ,默认优先级 priority 是 100。如果有重名使用,则可以调高优先级,防止被默认的覆盖。
  2. 索引模板默认将 @timestamp 字段设置为 date 属性

创建 Data stream

可以自动利用 template 的匹配模式新增文档创建

1
PUT /_data_stream/my-data-stream

删除

删除命令:

1
DELETE /_data_stream/my-data-stream

删除数据流会将数据流的后备索引一起删除。

使用 Data stream

此处对数据流的操作主要以命令为主

新增数据

Data stream 在新增数据时是只追加的模式,因此在固定 id 和 bulk 的模式下,op_type 是指定 create 的。

如下面命令:

1
2
POST my-data-stream/_create/1
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

或者

1
2
3
4
5
6
7
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

或者

1
2
3
4
5
6
7
8
9
10
POST /_reindex
{
"source": {
"index": "kibana_sample_data_logs_new"
},
"dest": {
"index": "my-data-stream",
"op_type": "create"
}
}

因字段中@timestamp是必填,实验数据如下改动:

1
2
3
4
5
6
7
8
9
10
11
12
POST _reindex
{
"source": {
"index": "kibana_sample_data_logs"
},
"dest": {
"index": "kibana_sample_data_logs_new"
},
"script": {
"source": "ctx._source['@timestamp'] = ctx._source.remove(\"timestamp\")"
}
}

获取 Data stream 状态

使用 Data stream stats API 查看 Data stream 的状态。

1
GET /_data_stream/my-data-stream/_stats?human=true

同时需要用 _ilm/explain 获取 Data stream 后备索引所在的 ILM 策略状态。

1
GET my-data-stream/_ilm/explain

手动 rollover Data stream

使用 rollover API,手动 rollover Data stream。

1
POST my-data-stream/_rollover

Response:

1
2
3
4
5
6
7
8
9
{
"acknowledged" : true,
"shards_acknowledged" : true,
"old_index" : ".ds-my-data-stream-000001",
"new_index" : ".ds-my-data-stream-000002",
"rolled_over" : true,
"dry_run" : false,
"conditions" : { }
}

再 GET 相关 Data stream 状态,后备索引增加。

1
GET /_data_stream/my-data-stream/

Response:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"data_streams" : [
{
"name" : "my-data-stream",
"timestamp_field" : {
"name" : "@timestamp"
},
"indices" : [
{
"index_name" : ".ds-my-data-stream-000001",
"index_uuid" : "AJBi0g3fRyG8-1tiH2UD2Q"
},
{
"index_name" : ".ds-my-data-stream-000002",
"index_uuid" : "AgOLGMSBSYWb4X-ID8uwtg"
}
],
"generation" : 2,
"status" : "GREEN",
"template" : "my-data-stream-template",
"ilm_policy" : "my-data-stream-policy",
"hidden" : false
}
]
}

Reindex Data stream

使用 reindex API 去复制数据到一个 Data stream。由于 Data stream 的只追加特性,在 op_type 中要选择为 create 。

1
2
3
4
5
6
7
8
9
10
POST /_reindex
{
"source": {
"index": "test"
},
"dest": {
"index": "my-data-stream",
"op_type": "create"
}
}

Response:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"took" : 80,
"timed_out" : false,
"total" : 1,
"updated" : 0,
"created" : 1,
"deleted" : 0,
"batches" : 1,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0,
"failures" : [ ]
}

Delete/Update by query

针对 Data stream 只能 delete/update by query 。

相关命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
POST /my-data-stream/_update_by_query
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
},
"script": {
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
}
}
POST /my-data-stream/_delete_by_query
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}

ES ReIndex迁移数据

发表于 2022-08-26   |  

一、给目标集群新增白名单

1、elasticsearch.yml中添加reindex.remote.whitelist: [“xx.xx.xx.xx:9200”],不加http,重启ES服务;

1
2
3
4
5
6
7
http.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
reindex.remote.whitelist: ["124.70.107.101:30997","47.94.255.84:30997"]
1
2
3
4
5
6
7
docker ps|grep elasticsearch
docker exec -it elasticsearch /bin/bash
vi config/elasticsearch.yml
docker restart elasticsearch

二、使用PostMan修改目标主机ES参数

1、PUT _settings?pretty

1
2
3
4
{
"index.refresh_interval": -1,
"index.number_of_replicas": 0
}

三、使用reindex同步数据

1、同步数据,POST _reindex?wait_for_completion=false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 1、过滤数据
{
"source": {
"remote": {
"host": "http://47.94.255.84:30997",
"username": "elastic",
"password": "IQsMLRniP5BcYfoNzTBT"
},
"size": 10000,
"index": "aiot_towercrane_realtime_master",
"query": {
"bool": {
"filter": [
{
"range": {
"createTime": {
"gte": 1652262590167,
"lte": 1652262590346
}
}
},
{
"term": {
"tenantId": "1441953685304872961"
}
}
]
}
}
},
"dest": {
"index": "aiot_towercrane_realtime_master"
}
}
# 2 查询数据
{
"source": {
"remote": {
"host": "http://47.94.255.84:30997",
"username": "elastic",
"password": "IQsMLRniP5BcYfoNzTBT"
},
"index": "aiot_deeppit_mesure_day_record_master",
"query": {
"match": {
"projectId": "1447476989099610114"
}
}
},
"dest": {
"index": "aiot_deeppit_mesure_day_record_xiongan"
}
}

注:

  • 同步之前,需要删除目标主机中已存在索引,否则同步不成功。
  • 同步ES数据时,如果需要认证,目标主机添加Basic auth认证,源主机添加username和password;

2、批量修改数据 POST aiot_towercrane_param_xiongan/_update_by_query

1
2
3
4
5
6
7
8
9
10
11
http://144.7.110.56:9200/aiot_towercrane_param_xiongan/_update_by_query
{
"query": {
"match_all": {
}
},
"script": {
"source": "ctx._source.tenantId = 1504718344228839425L "
}
}
  • 注意:Long类型更新需要添加L

四、查看迁移状况

1、查看所有task

GET _tasks?detailed=true&actions=*reindex

1
http://144.7.110.56:9200/_tasks?detailed=true&actions=*reindex

GET _tasks/xxx?pretty

1
http://144.7.110.56:9200/_tasks/6kPnJej0SZKt4Y5THf6TuQ:697134?pretty

五、恢复新es设置

1、PUT _settings?pretty

1
2
3
4
{
"index.refresh_interval": "10m",
"index.number_of_replicas": 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool查询总结
must:与关系,相当于关系型数据库中的 and。
should:或关系,相当于关系型数据库中的 or。
must_not:非关系,相当于关系型数据库中的 not。
filter:过滤条件。
range:条件筛选范围。
gt:大于,相当于关系型数据库中的 >。
gte:大于等于,相当于关系型数据库中的 >=。
lt:小于,相当于关系型数据库中的 <。
lte:小于等于,相当于关系型数据库中的 <=。
123…16
peterfei

peterfei

peterfei|技术|上主是我的牧者

77 日志
14 分类
62 标签
RSS
github
© 2023 peterfei
由 Hexo 强力驱动
主题 - NexT.Mist