Elastic 使用极限网关进行数据双向切换

极限网关 (INFINI Gateway) 是一个面向 Elasticsearch 的高性能应用网关,它包含丰富的特性,使用起来也非常简单。极限网关工作的方式和普通的反向代理一样,我们一般是将网关部署在 Elasticsearch 集群前面, 将以往直接发送给 Elasticsearch 的请求都发送给网关,再由网关转发给请求到后端的 Elasticsearch 集群。因为网关位于在用户端和后端 Elasticsearch 之间,所以网关在中间可以做非常多的事情, 比如可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等

下载安装

1
2
3
wget https://release.infinilabs.com/gateway/stable/gateway-1.8.6-769-linux-amd64.tar.gz
tar vxzf gateway-1.8.6-769-linux-amd64.tar.gz
mv gateway-linux-amd64 bin/gateway

验证安装

极限网关下载解压之后,我们可以执行这个命令来验证安装包是否有效,如下:

1
2
✗ ./bin/gateway -v
gateway 1.0.0_SNAPSHOT 2021-01-03 22:45:28 6a54bb2

如果能够正常看到上面的版本信息,说明网关程序本身一切正常。

启动网关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
[root@k8s-master gateway]# cat /opt/gateway/gateway.yml
# 数据路径
path.data: data
# 日志路径
path.logs: log
# 定义 Elasticsearch 集群地址
elasticsearch:
# cluster01 集群
- name: cluster01
enabled: true
endpoint: http://192.168.10.15:9200
basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
discovery:
enabled: true
refresh:
enabled: true
interval: 1s
# cluster02 集群
- name: cluster02
enabled: true
endpoint: http://192.168.10.15:30993
basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
discovery:
enabled: true
refresh:
enabled: true
interval: 1s
# 定义网关入口
entry:
- name: my_es_entry
enabled: true
router: my_router
network:
binding: 0.0.0.0:8000
# 定义工作流
flow:
- name: auth-flow
filter:
#- basic_auth:
# valid_users:
# elastic: ******
- set_basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
- name: set-auth-for-backup-flow
filter:
- set_basic_auth: #覆盖备集群的身份信息用于备集群正常处理请求
username: elastic
password: IQsMLRniP5BcYfoNzTBT
# 写请求优先发给主集群, 当主集群不可用时发给备集群
# 当主集群数据写入成功时,记录到队列中,异步消费写入备集群
- name: write-flow
filter:
- flow:
flows:
- auth-flow
- if:
# 当主集群可用时
cluster_available: ["cluster01"]
then:
# 先将数据写入主集群
- elasticsearch:
elasticsearch: "cluster01"
# 写入消息队列,等待 pipeline 异步消费到备集群
- queue:
queue_name: "cluster02-queue"
else:
- elasticsearch:
elasticsearch: "cluster02"
- queue:
queue_name: "cluster01-queue"
# 读请求优先发给主集群, 当主集群不可用时发给备集群
- name: read-flow
filter:
- flow:
flows:
- set-auth-for-backup-flow
- if:
cluster_available: ["cluster01"]
then:
- elasticsearch:
elasticsearch: "cluster01"
else:
- elasticsearch:
elasticsearch: "cluster02"
# 路由规则
router:
- name: my_router
# 默认路由
default_flow: write-flow
# 读请求路由
rules:
- method:
- "GET"
- "HEAD"
pattern:
- "/{any:*}"
flow:
- read-flow
- method:
- "POST"
- "GET"
pattern:
- "/_refresh"
- "/_count"
- "/_search"
- "/_msearch"
- "/_mget"
- "/{any_index}/_count"
- "/{any_index}/_search"
- "/{any_index}/_msearch"
- "/{any_index}/_mget"
flow:
- read-flow
# 定义管道, 异步将数据写入备集群
pipeline:
- name: cluster01-consumer
auto_start: true
keep_running: true
processor:
- queue_consumer:
input_queue: "cluster01-queue"
elasticsearch: "cluster01"
when:
cluster_available: ["cluster01"] # 当集群可用时,才消费队列中的数据
- name: cluster02-consumer
auto_start: true
keep_running: true
processor:
- queue_consumer:
input_queue: "cluster02-queue"
elasticsearch: "cluster02"
when:
cluster_available: ["cluster02"]
elastic:
enabled: true
remote_configs: false
health_check:
enabled: true
interval: 1s
availability_check:
enabled: true
interval: 1s
metadata_refresh:
enabled: true
interval: 1s
cluster_settings_check:
enabled: false
interval: 1s

启动网关

[root@k8s-master gateway]# ./bin/gateway

测试准备

建立本地docker es 集群,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
[root@k8s-master gateway]# cat docker-compose.yml
version: '3.8'
services:
# 集群 cluster01
# Elasticsearch
es01:
image: docker.io/library/elasticsearch:7.9.3
container_name: es01
environment:
# 节点名
- node.name=es01
# 集群名
- cluster.name=cluster01
# 指定单节点启动
- discovery.type=single-node
# 开启内存锁定
- bootstrap.memory_lock=true
# 设置内存大小
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
# 启用安全
- xpack.security.enabled=true
# 设置 elastic 用户密码
- ELASTIC_PASSWORD=IQsMLRniP5BcYfoNzTBT
ulimits:
memlock:
soft: -1
hard: -1
# 映射到主机名的端口 宿主机端口:容器端口
ports:
- 9200:9200
volumes:
- data01:/usr/share/elasticsearch/data
networks:
- elastic
# Kibana
kib01:
image: kibana:7.9.3
container_name: kib01
ports:
- 5601:5601
environment:
# Elasticsearch 连接信息
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: '["http://es01:9200"]'
ELASTICSEARCH_USERNAME: elastic
ELASTICSEARCH_PASSWORD: IQsMLRniP5BcYfoNzTBT
networks:
- elastic
# 存储卷
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
# 网络
networks:
elastic:
driver: bridge

k8s 集群(已提前搭建)

kubectl exec -it $(kubectl get pods -n esbeta| grep elasticsearch-client | sed -n 1p | awk ‘{print $1}’) -n esbeta – bin/elasticsearch-setup-passwords interactive