“Java Elasticsearch:Document APIs”的版本间差异

来自Wikioe
跳到导航 跳到搜索
(建立内容为“category:ElasticSearch category:Java == Index Api == Java Elasticsearch Index API 主要用于插入或者更新文档数据。 === 创建Index Request…”的新页面)
 
无编辑摘要
 
(未显示同一用户的15个中间版本)
第1行: 第1行:
[[category:ElasticSearch]]
[[category:Java&ElasticSearch]]
[[category:Java]]


== Index Api ==
== Index Api ==
Java Elasticsearch Index API 主要用于插入或者更新文档数据。
Java Elasticsearch Index API 主要用于'''插入'''或者'''更新'''文档数据。


=== 创建Index Request ===
=== 创建“IndexRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建Index Request,设置索引名为: posts
// 创建Index Request,设置索引名为: posts
第15行: 第14行:


=== 设置文档内容 ===
=== 设置文档内容 ===
支持以'''JSON字符串形式'''或者'''Map形式'''设置文档内容。
支持以'''JSON字符串'''方式或者'''map'''方式设置文档内容。


 
==== JSON字符串方式 ====
JSON字符串形式:
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建 JSON字符串
// 创建 JSON字符串
第31行: 第29行:
</syntaxhighlight>
</syntaxhighlight>


 
==== map方式 ====
Map形式:
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建 Map
// 创建 Map
第79行: 第76行:


==== 异步执行 ====
==== 异步执行 ====
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
IndexResponse indexResponse = client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
             @Override
             @Override
             public void onResponse(IndexResponse indexResponse) {
             public void onResponse(IndexResponse indexResponse) {
第93行: 第92行:
</syntaxhighlight>
</syntaxhighlight>


=== 处理请求结果 ===
=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 获取索引名
// 获取索引名
第108行: 第107行:


== Get Api ==
== Get Api ==
Get Api 主要用于根据文档ID'''查询'''索引数据。
=== 创建“GetRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");
</syntaxhighlight>


=== 可选参数 ===
==== 是否返回文档内容 ====
* 默认返回文档内容
<syntaxhighlight lang="Java" highlight="">
// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
</syntaxhighlight>
</syntaxhighlight>


==== 返回、过滤:指定字段 ====
<syntaxhighlight lang="Java" highlight="">
// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;
// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;


<syntaxhighlight lang="Java" highlight="">
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);


request.fetchSourceContext(fetchSourceContext);
</syntaxhighlight>
</syntaxhighlight>


==== routing ====
<syntaxhighlight lang="Java" highlight="">
request.routing("routing");
</syntaxhighlight>


==== Version ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
request.version(2);
</syntaxhighlight>


=== 执行请求 ===
==== 同步执行 ====
<syntaxhighlight lang="Java" highlight="">
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
</syntaxhighlight>
</syntaxhighlight>


==== 异步执行 ====
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse getResponse) {
                // 请求成功回调函数
            }


            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });
</syntaxhighlight>
</syntaxhighlight>


 
=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
String index = getResponse.getIndex();
String id = getResponse.getId();


// 检测索引是否存在
if (getResponse.isExists()) {
    // 获取版本号
    long version = getResponse.getVersion();
   
    // 获取文档内容,json字符串形式
    String sourceAsString = getResponse.getSourceAsString();   
    // 获取文档内容,map形式
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
    // 获取文档内容,字节数组形式
    byte[] sourceAsBytes = getResponse.getSourceAsBytes();         
} else {
    //
}
</syntaxhighlight>
</syntaxhighlight>


== Delete API ==
Delete API 主要用于根据文档ID'''删除'''索引文档。


=== 创建“DeleteRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");
</syntaxhighlight>


=== 可选参数 ===
==== routing ====
<syntaxhighlight lang="Java" highlight="">
request.routing("routing");
</syntaxhighlight>
</syntaxhighlight>


==== timeout ====
<syntaxhighlight lang="Java" highlight="">
request.timeout(TimeValue.timeValueMinutes(2));    //格式1: 2分钟
request.timeout("2m");    //格式2:2分钟
</syntaxhighlight>


==== Version ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
request.version(2);
</syntaxhighlight>


=== 执行请求 ===
==== 同步执行 ====
<syntaxhighlight lang="Java" highlight="">
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
</syntaxhighlight>
</syntaxhighlight>


==== 异步执行 ====
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                // 请求成功回调函数
            }


            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });
</syntaxhighlight>
</syntaxhighlight>


== Update API ==
Elasticsearch Update API 根据文档ID'''更新'''文档内容。
主要支持两种方式更新文档内容:
# '''通过脚本的方式更新''';
# '''更新文档部分字段'''。
* 如果被更新的文档不存在,也支持通过 '''upsert''' api 实现'''插入'''操作。


=== 创建“UpdateRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");
</syntaxhighlight>


</syntaxhighlight>
=== 设置更新内容 ===
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:


==== 脚本方式 ====
通过 ES 内置 '''script''' 脚本更新文档内容。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4);
// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters); 


// 设置脚本
request.script(inline);
</syntaxhighlight>
</syntaxhighlight>


==== map方式 ====
通过 '''map 对象''' 更新文档部分内容。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");


// 设置更新内容
request.doc(jsonMap);
</syntaxhighlight>
</syntaxhighlight>


==== json字符串方式 ====
通过 '''json字符串''' 方式更新文档部分内容。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON);
</syntaxhighlight>


==== upsert方式 ====
通过 '''Upsert''' 方式更新文档内容。
* 跟前面三种类似,'''支持json字符串、map、脚本方式''';
* 但是有一点区别,'''如果被更新的文档不存在,则会执行插入操作'''。
如:(json字符串方式)
<syntaxhighlight lang="Java" highlight="">
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
</syntaxhighlight>
</syntaxhighlight>


=== 可选参数 ===
==== routing ====
<syntaxhighlight lang="Java" highlight="">
request.routing("routing");
</syntaxhighlight>


==== timeout ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒


request.timeout("1s"); // 方式2:1秒
</syntaxhighlight>
</syntaxhighlight>


==== 版本冲突重试 ====
如果更新文档的时候出现版本冲突,重试几次。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
request.retryOnConflict(3);
</syntaxhighlight>
</syntaxhighlight>


==== 并发控制 ====
设置并发控制参数,新版的ES已经废弃老的 version 字段。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 设置版本号
request.setIfSeqNo(2L);
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);
</syntaxhighlight>


=== 执行请求 ===
==== 同步执行 ====
<syntaxhighlight lang="Java" highlight="">
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
</syntaxhighlight>
</syntaxhighlight>


==== 异步执行 ====
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
            @Override
            public void onResponse(UpdateResponse updateResponse) {
              // 执行成功
            }


            @Override
            public void onFailure(Exception e) {
              // 执行失败
            }
        });
</syntaxhighlight>
</syntaxhighlight>


 
=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();


if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 成功更新文档
}
</syntaxhighlight>
</syntaxhighlight>


== Update By Query API(批量更新) ==
ES update by query api主要用于'''批量更新'''文档内容,支持'''设置查询条件限制更新'''文档的范围。


=== 创建“UpdateByQueryRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
</syntaxhighlight>
=== 版本冲突 ===
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


</syntaxhighlight>
版本冲突解决方案如下:
* '''proceed''':忽略版本冲突,继续执行
* '''abort''':遇到版本冲突,中断执行




<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
request.setConflicts("proceed");
</syntaxhighlight>
</syntaxhighlight>


=== 设置查询条件 ===
* (ES的查询语法是非常丰富的,这里仅给出一种写法)


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
</syntaxhighlight>
</syntaxhighlight>


=== 限制更新文档数量 ===
可以限制批量更新文档的数量。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
request.setMaxDocs(10);
</syntaxhighlight>
</syntaxhighlight>


=== 设置更新内容 ===
Update by query api更新文档内容,'''仅支持通过【脚本方式】修改文档内容'''。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
request.setScript(
    new Script( // 创建inline脚本,使用painless语言。
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));
</syntaxhighlight>


=== 执行请求 ===
<syntaxhighlight lang="Java" highlight="">
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
</syntaxhighlight>
</syntaxhighlight>


 
=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
</syntaxhighlight>
</syntaxhighlight>


== Delete By Query API(批量删除) ==
Java ES Delete By Query API 主要用于'''批量删除'''操作,支持'''设置查询条件'''。


=== 创建“DeleteByQueryRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引
// 设置批量删除的索引名为:source1和source2
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
</syntaxhighlight>
</syntaxhighlight>


=== 版本冲突 ===
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


<syntaxhighlight lang="Java" highlight="">


</syntaxhighlight>
版本冲突解决方案如下:
* '''proceed''':忽略版本冲突,继续执行
* '''abort''':遇到版本冲突,中断执行




<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
request.setConflicts("proceed");
</syntaxhighlight>
</syntaxhighlight>


=== 设置查询条件 ===
* (ES的查询语法是非常丰富的,这里仅给出一种写法)


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
</syntaxhighlight>


=== 限制删除文档数量 ===
<syntaxhighlight lang="Java" highlight="">
request.setMaxDocs(10);
</syntaxhighlight>
</syntaxhighlight>


=== 执行请求 ===
<syntaxhighlight lang="Java" highlight="">
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
</syntaxhighlight>


=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
</syntaxhighlight>
</syntaxhighlight>


== Multi-Get API(批量查询) ==
Multi-Get API 主要用于'''根据id集合''','''批量查询'''文档内容,支持查询多个索引内容。


=== 创建“MultiGetRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
MultiGetRequest request = new MultiGetRequest();
// 通过 MultiGetRequest.Item 对象设置查询参数
// 添加另外一组查询参数,索引名=index, 索引Id=12345
request.add(new MultiGetRequest.Item("index", "12345"));  // 文档id


// 添加另外一组查询参数,索引名=index, 索引Id=another_id
request.add(new MultiGetRequest.Item("index", "another_id"));
</syntaxhighlight>
</syntaxhighlight>


 
=== 执行请求 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
</syntaxhighlight>
</syntaxhighlight>


=== 处理结果 ===
* 每个 '''MultiGetItemResponse''' 对象代表一个查询结果。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
// ps: 通常是需要循环遍历response.getResponses返回的结果
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure());


GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();  // 获取索引名
String id = firstItem.getId();  // 获取文档Id
if (firstGet.isExists()) { // 检测文档是否存在
    // 获取版本号
    long version = firstGet.getVersion();
   
    // 查询文档内容,json字符串格式
    String sourceAsString = firstGet.getSourceAsString();     
    // 查询文档内容,Map对象格式 
    Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
} else {
   
}
</syntaxhighlight>
</syntaxhighlight>


== Bulk API(批量操作) ==
ES 的 Bulk API 主要用于在'''单个请求中,批量执行创建、更新、删除文档操作''',避免循环发送大量的 ES 请求。


=== 创建“BulkRequest” ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
BulkRequest request = new BulkRequest();
</syntaxhighlight>
</syntaxhighlight>


=== 添加操作对象 ===
支持 '''index'''/'''update'''/'''delete''' 操作。


==== 批量index操作 ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
 
// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));
</syntaxhighlight>
</syntaxhighlight>


==== 混合操作 ====
批量执行文档的删除、更新、创建操作。


<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));
</syntaxhighlight>


=== 可选参数 ===
==== timeout ====
<syntaxhighlight lang="Java" highlight="">
request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟
</syntaxhighlight>
</syntaxhighlight>


=== 执行请求 ===
<syntaxhighlight lang="Java" highlight="">
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
</syntaxhighlight>


=== 处理结果 ===
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
if (bulkResponse.hasFailures()) {
    // 至少存在一个错误处理
}


// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
    DocWriteResponse itemResponse = bulkItemResponse.getResponse();
   
    // 根据操作类型检测执行结果
    switch (bulkItemResponse.getOpType()) {
        case INDEX:   
        case CREATE:
            // 处理创建请求
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE: 
            // 处理更新请求
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE: 
            // 处理删除请求
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}
</syntaxhighlight>
</syntaxhighlight>

2023年3月31日 (五) 22:03的最新版本


Index Api

Java Elasticsearch Index API 主要用于插入或者更新文档数据。

创建“IndexRequest”

// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts"); 

// 设置文档ID
request.id("1");

设置文档内容

支持以JSON字符串方式或者map方式设置文档内容。

JSON字符串方式

// 创建 JSON字符串
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";

// 设置文档内容
request.source(jsonString, XContentType.JSON);

map方式

// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");

// 设置文档内容
request.source(jsonMap);

可选参数

routing

设置路由字段

request.routing("routing");

timeout

设置单个请求超时参数

request.timeout(TimeValue.timeValueSeconds(1)); 
// or
request.timeout("1s");

Version

设置文档版本

request.version(2);

操作类型

Index api支持两类操作:create 或者 index (默认)

request.opType("create");

执行请求

同步执行

IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

处理结果

// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();

if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 成功更新文档
}

Get Api

Get Api 主要用于根据文档ID查询索引数据。

创建“GetRequest”

// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");

可选参数

是否返回文档内容

  • 默认返回文档内容


// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

返回、过滤:指定字段

// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;

// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;

FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);

request.fetchSourceContext(fetchSourceContext);

routing

request.routing("routing");

Version

request.version(2);

执行请求

同步执行

GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse getResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

处理结果

String index = getResponse.getIndex();
String id = getResponse.getId();

// 检测索引是否存在
if (getResponse.isExists()) {
    // 获取版本号
    long version = getResponse.getVersion();
    
    // 获取文档内容,json字符串形式
    String sourceAsString = getResponse.getSourceAsString();    
    // 获取文档内容,map形式
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); 
    // 获取文档内容,字节数组形式
    byte[] sourceAsBytes = getResponse.getSourceAsBytes();          
} else {
    //
}

Delete API

Delete API 主要用于根据文档ID删除索引文档。

创建“DeleteRequest”

// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");

可选参数

routing

request.routing("routing");

timeout

request.timeout(TimeValue.timeValueMinutes(2));    //格式1: 2分钟

request.timeout("2m");    //格式2:2分钟

Version

request.version(2);

执行请求

同步执行

DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

Update API

Elasticsearch Update API 根据文档ID更新文档内容。


主要支持两种方式更新文档内容:

  1. 通过脚本的方式更新
  2. 更新文档部分字段


  • 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。

创建“UpdateRequest”

// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");

设置更新内容

UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:

脚本方式

通过 ES 内置 script 脚本更新文档内容。

// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4); 

// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  

// 设置脚本
request.script(inline);

map方式

通过 map 对象 更新文档部分内容。

// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");

// 设置更新内容
request.doc(jsonMap);

json字符串方式

通过 json字符串 方式更新文档部分内容。

String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON);

upsert方式

通过 Upsert 方式更新文档内容。

  • 跟前面三种类似,支持json字符串、map、脚本方式
  • 但是有一点区别,如果被更新的文档不存在,则会执行插入操作

如:(json字符串方式)

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

可选参数

routing

request.routing("routing");

timeout

request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒

request.timeout("1s"); // 方式2:1秒

版本冲突重试

如果更新文档的时候出现版本冲突,重试几次。

request.retryOnConflict(3);

并发控制

设置并发控制参数,新版的ES已经废弃老的 version 字段。

// 设置版本号
request.setIfSeqNo(2L); 
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);

执行请求

同步执行

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
            @Override
            public void onResponse(UpdateResponse updateResponse) {
              // 执行成功
            }

            @Override
            public void onFailure(Exception e) {
               // 执行失败
            }
        });

处理结果

// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();

if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 成功更新文档
}

Update By Query API(批量更新)

ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。

创建“UpdateByQueryRequest”

// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");

版本冲突

批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


版本冲突解决方案如下:

  • proceed:忽略版本冲突,继续执行
  • abort:遇到版本冲突,中断执行


request.setConflicts("proceed");

设置查询条件

  • (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));

限制更新文档数量

可以限制批量更新文档的数量。

request.setMaxDocs(10);

设置更新内容

Update by query api更新文档内容,仅支持通过【脚本方式】修改文档内容

request.setScript(
    new Script( // 创建inline脚本,使用painless语言。
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));

执行请求

BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

处理结果

TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

Delete By Query API(批量删除)

Java ES Delete By Query API 主要用于批量删除操作,支持设置查询条件

创建“DeleteByQueryRequest”

// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引
// 设置批量删除的索引名为:source1和source2
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");

版本冲突

批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


版本冲突解决方案如下:

  • proceed:忽略版本冲突,继续执行
  • abort:遇到版本冲突,中断执行


request.setConflicts("proceed");

设置查询条件

  • (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));

限制删除文档数量

request.setMaxDocs(10);

执行请求

BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);

处理结果

TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

Multi-Get API(批量查询)

Multi-Get API 主要用于根据id集合批量查询文档内容,支持查询多个索引内容。

创建“MultiGetRequest”

MultiGetRequest request = new MultiGetRequest();

// 通过 MultiGetRequest.Item 对象设置查询参数
// 添加另外一组查询参数,索引名=index, 索引Id=12345
request.add(new MultiGetRequest.Item("index", "12345"));  // 文档id

// 添加另外一组查询参数,索引名=index, 索引Id=another_id
request.add(new MultiGetRequest.Item("index", "another_id"));

执行请求

MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

处理结果

  • 每个 MultiGetItemResponse 对象代表一个查询结果。
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
// ps: 通常是需要循环遍历response.getResponses返回的结果
MultiGetItemResponse firstItem = response.getResponses()[0];

assertNull(firstItem.getFailure());

GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();   // 获取索引名
String id = firstItem.getId();   // 获取文档Id
if (firstGet.isExists()) { // 检测文档是否存在
    // 获取版本号
    long version = firstGet.getVersion(); 
    
    // 查询文档内容,json字符串格式
    String sourceAsString = firstGet.getSourceAsString();      
    // 查询文档内容,Map对象格式  
    Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); 
} else {
    
}

Bulk API(批量操作)

ES 的 Bulk API 主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的 ES 请求。

创建“BulkRequest”

BulkRequest request = new BulkRequest();

添加操作对象

支持 index/update/delete 操作。

批量index操作

// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));

混合操作

批量执行文档的删除、更新、创建操作。

request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));

可选参数

timeout

request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟

执行请求

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

处理结果

if (bulkResponse.hasFailures()) { 
    // 至少存在一个错误处理
}

// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
    
    // 根据操作类型检测执行结果
    switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            // 处理创建请求
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            // 处理更新请求
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:  
            // 处理删除请求 
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}