WhatAKitty Daily

A Programmer's Daily Record

ES查询中如何获取并解析查询字段

WhatAKitty   阅读次数loading...

背景

最近在学习Elasticsearch相关的一些原理,然后在映像中ES的text字段如果存在keyword的fields,则会在聚合的时候选择keyword类型进行聚合,同时在文本检索的时候根据text类型分词检索。因此,想探究下这个的实现过程。

ES查询原理

需要了解keyword的选择郭成,需要指定如下两点:

  1. 字段在启动过程中如何初始化
  2. 查询的时候如何筛选需要的字段以及选择类型

ES初始化流程

通过以下时序图记录下ES初始化过程中的大致过程:

sequenceDiagram;
  Elasticsearch->>Command: 主程序入口,解析命令参数
  Command->>Command: 注册closeHook,执行主函数前置hook函数beforeMain.run()
  Command->>EnvironmentAwareCommand: 执行环境参数初始化
  EnvironmentAwareCommand->>Elasticsearch: 执行启动具体命令
  Elasticsearch->>Elasticsearch: 参数解析
  Elasticsearch->>Bootstrap: 执行初始化静态方法init
  Bootstrap->>Bootstrap: 实例化Bootstrap,初始化keepAlive线程
  Bootstrap->>Bootstrap: 创建上下文环境,设置环境的日志配置
  Bootstrap->>Bootstrap: 创建PID文件
  Bootstrap->>Bootstrap: 检查luence版本
  Bootstrap->>Bootstrap: 初始化插件进程,初始化节点
  Bootstrap->>Bootstrap: 节点启动以及保活线程启动
  BootStrap->>Node: 尝试启动节点
  Node->>Node: 节点状态更改
  Node->>Node: 各服务初始化启动
  Node->>Node: Netty服务启动
  alt: 当前节点为Master/Data
    Node->>Node: 获取GatewayMetaState实例
    Node->>GatewayMetaState: 执行loadMetaState载入元信息
    GatewayMetaState->>MetaStateService: 执行loadFullState方法载入全局状态以及索引状态
    MetaStateService->>MetaStateService: 载入全局状态
    MetaStateService->>MetaStateService: 查找所有的索引路径
    MetaStateService->>MetaStateService: 解析每个索引路径,载入最新的状态
    MetaStateService->>MetaStateService: 将索引状态放入全局状态内缓存
  else:
    Node->>Node: 创建空的元信息
  end
  Node->>Node: 启动Discovery服务以及Cluster服务
  Node->>Node: 节点Transport服务开始接收请求
  Node->>Node: 初始化集群状态,并监听集群状态变更
  alt: 启用HTTP模块
    Node->>Node: 初始化HTTP服务,并启动
  end
  alt: 是否将端口信息写入文件
    alt: 启用HTTP模块
      Node->>Node: 将http端口写入
    end
    Node->>Node: 将transport端口写入
  end
  Node->>Node: 集群插件执行onNodeStarted方法进行节点初始化

上述流程大致将ES整个启动流程大概讲述了一遍,具体了解下在加载过程中如何解析数据文件并将其加入缓存。

在上述流程中可以找到如下这一块是重点:

sequenceDiagram;
  Node->>Node: 获取GatewayMetaState实例
  Node->>GatewayMetaState: 执行loadMetaState载入元信息
  GatewayMetaState->>MetaStateService: 执行loadFullState方法载入全局状态以及索引状态
  MetaStateService->>MetaStateService: 载入全局状态
  MetaStateService->>MetaStateService: 查找所有的索引路径
  MetaStateService->>MetaStateService: 解析每个索引路径,载入最新的状态
  MetaStateService->>MetaStateService: 将索引状态放入全局状态内缓存

载入每个索引最新的状态代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public  T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException {
List<PathAndStateId> files = new ArrayList<>();
long maxStateId = -1;
boolean maxStateIdIsLegacy = true;
if (dataLocations != null) {
// 对于数据文件位置列表做遍历
for (Path dataLocation : dataLocations) {
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME);
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) {
for (Path stateFile : paths) {
// 筛选状态文件
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long stateId = Long.parseLong(matcher.group(1));
maxStateId = Math.max(maxStateId, stateId);
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
maxStateIdIsLegacy &= legacy;
PathAndStateId pav = new PathAndStateId(stateFile, stateId, legacy);
logger.trace("found state file: {}", pav);
// 加入文件列表
files.add(pav);
}
}
} catch (NoSuchFileException | FileNotFoundException ex) {
// no _state directory -- move on
}
}
}
final List<Throwable> exceptions = new ArrayList<>();
T state = null;
// 筛选出最大版本号的最新索引文件
Collection<PathAndStateId> pathAndStateIds = files
.stream()
.filter(new StateIdAndLegacyPredicate(maxStateId, maxStateIdIsLegacy))
.collect(Collectors.toCollection(ArrayList::new));

for (PathAndStateId pathAndStateId : pathAndStateIds) {
try {
final Path stateFile = pathAndStateId.file;
final long id = pathAndStateId.id;
if (pathAndStateId.legacy) {
// 旧版格式
final byte[] data = Files.readAllBytes(stateFile);
if (data.length == 0) {
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
continue;
}
// 解析文件内容
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, new BytesArray(data))) {
state = fromXContent(parser);
}
if (state == null) {
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
}
} else {
// 最新版本格式读取
state = read(namedXContentRegistry, stateFile);
logger.trace("state id [{}] read from [{}]", id, stateFile.getFileName());
}
return state;
} catch (Exception e) {
exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e));
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: failed to read [{}], ignoring...", pathAndStateId.file.toAbsolutePath(), prefix), e);
}
}
// ....
return state;
}

在方法read内会解析真正的文件内容。

ES聚合查询流程

ES每一次查询通过netty服务传递查询命令,以下是聚合查询流程:

sequenceDiagram;
  Netty4HttpRequestHandler->>Netty4HttpRequestHandler: 读取数据
  Netty4HttpRequestHandler->>Netty4HttpServerTransport: 传输请求
  Netty4HttpServerTransport->>RestController: 分发请求
  RestController->>RestController: 遍历所有处理器,查找匹配的handler
  alt 查找到匹配的:
    RestController->>BaseRestHandler: 处理请求
    BaseRestHandler->>RestSearchAction: 解析请求获取具体的处置动作
    RestSearchAction->>SearchSourceBuilder: 根据传递过来的json字符串解析请求内容
    SearchSourceBuilder->>SearchSourceBuilder: 根据string token,判断不同类型,解析不同的数据
    SearchSourceBuilder->>AggregatorFactories: 从字符串中构建出聚合工厂用于后续构建聚合
    AggregatorFactories->>AggregatorFactories: 构建聚合工厂
    AggregatorFactories->>SearchSourceBuilder: 继续解析其他内容
    SearchSourceBuilder->>RestSearchAction: 解析完成
    RestSearchAction->>RestSearchAction: 解析查询类型、路由、索引选项以及深度分页请求信息等等
    RestSearchAction->>BaseRestHandler: 返回预处理完成的Action
    alt: 存在有参数未处理
      BaseRestHandler->>BaseRestHandler: 抛出异常
    end
    BaseRestHandler->>BaseRestHandler: 使用计数+1
    BaseRestHandler->>RestSearchAction: 将请求交由RestSearchAction处理
    RestSearchAction->>TransportSearchAction: 查询action交由具体Transport执行
    TransportSearchAction->>TransportSearchAction: 根据state以及请求获取具体的索引
    TransportSearchAction->>TransportSearchAction: 预处理别名
    TransportSearchAction->>TransportSearchAction: 查询请求路由
    TransportSearchAction->>TransportSearchAction: 获取索引在所有shards上的遍历器
    TransportSearchAction->>AbstractSearchAsyncAction: 异步查询解析
    AbstractSearchAsyncAction->>InitialSearchPhase: 执行异步查询请求
    InitialSearchPhase->>InitialSearchPhase: 分片遍历并在每个分片上执行查询
    InitialSearchPhase->>SearchQueryThenFetchAsyncAction: 传递分片信息以及查询请求
    SearchQueryThenFetchAsyncAction->>TransportService: 将查询异步请求交由Transport服务来执行
    TransportService->>TransportService: 交由localNodeConnection执行查询请求
    TransportService->>SearchTransportService: 将请求传递
    SearchTransportService->>SearchService: 执行searchService请求查询传入请求以及回调监听
    SearchService->>SearchService: 执行查询解析并查询具体结果
    SearchService->>QueryPhase: 查询解析
    QueryPhase->>AggregationPhase: 聚合预处理,构建顶层聚合
    AggregationPhase->>QueryPhase: 预处理完成
    QueryPhase->>QueryPhase: 聚合语句执行
    QueryPhase->>QueryPhase: 返回执行结果
    QueryPhase->>SearchService: 结果返回
  end

上述的整个流程过程中,有三处地方对于聚合做了处理:

  1. 请求字符串解析出聚合构建器
  2. 预处理聚合构建器,构建出top聚合
  3. 聚合语句执行

这三处地方,并没有对于聚合字段做特殊判断,比如在聚合的时候会直接选择keyword属性。而是需要在请求的时候,显示指定字段类型为keyword类型的标识。

总结

如果需要在聚合的时候直接使用keyword,则显示指定它。