Java Elasticsearch:Document APIs

来自Wikioe
Eijux讨论 | 贡献2023年3月31日 (五) 22:03的版本
(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳到导航 跳到搜索


Index Api

Java Elasticsearch Index API 主要用于插入或者更新文档数据。

创建“IndexRequest”

// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts"); 

// 设置文档ID
request.id("1");

设置文档内容

支持以JSON字符串方式或者map方式设置文档内容。

JSON字符串方式

// 创建 JSON字符串
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";

// 设置文档内容
request.source(jsonString, XContentType.JSON);

map方式

// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");

// 设置文档内容
request.source(jsonMap);

可选参数

routing

设置路由字段

request.routing("routing");

timeout

设置单个请求超时参数

request.timeout(TimeValue.timeValueSeconds(1)); 
// or
request.timeout("1s");

Version

设置文档版本

request.version(2);

操作类型

Index api支持两类操作:create 或者 index (默认)

request.opType("create");

执行请求

同步执行

IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

处理结果

// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();

if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 成功更新文档
}

Get Api

Get Api 主要用于根据文档ID查询索引数据。

创建“GetRequest”

// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");

可选参数

是否返回文档内容

  • 默认返回文档内容


// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

返回、过滤:指定字段

// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;

// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;

FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);

request.fetchSourceContext(fetchSourceContext);

routing

request.routing("routing");

Version

request.version(2);

执行请求

同步执行

GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse getResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

处理结果

String index = getResponse.getIndex();
String id = getResponse.getId();

// 检测索引是否存在
if (getResponse.isExists()) {
    // 获取版本号
    long version = getResponse.getVersion();
    
    // 获取文档内容,json字符串形式
    String sourceAsString = getResponse.getSourceAsString();    
    // 获取文档内容,map形式
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); 
    // 获取文档内容,字节数组形式
    byte[] sourceAsBytes = getResponse.getSourceAsBytes();          
} else {
    //
}

Delete API

Delete API 主要用于根据文档ID删除索引文档。

创建“DeleteRequest”

// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");

可选参数

routing

request.routing("routing");

timeout

request.timeout(TimeValue.timeValueMinutes(2));    //格式1: 2分钟

request.timeout("2m");    //格式2:2分钟

Version

request.version(2);

执行请求

同步执行

DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                // 请求成功回调函数
            }

            @Override
            public void onFailure(Exception e) {
                // 请求失败回调函数
            }
        });

Update API

Elasticsearch Update API 根据文档ID更新文档内容。


主要支持两种方式更新文档内容:

  1. 通过脚本的方式更新
  2. 更新文档部分字段


  • 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。

创建“UpdateRequest”

// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");

设置更新内容

UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:

脚本方式

通过 ES 内置 script 脚本更新文档内容。

// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4); 

// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  

// 设置脚本
request.script(inline);

map方式

通过 map 对象 更新文档部分内容。

// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");

// 设置更新内容
request.doc(jsonMap);

json字符串方式

通过 json字符串 方式更新文档部分内容。

String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON);

upsert方式

通过 Upsert 方式更新文档内容。

  • 跟前面三种类似,支持json字符串、map、脚本方式
  • 但是有一点区别,如果被更新的文档不存在,则会执行插入操作

如:(json字符串方式)

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

可选参数

routing

request.routing("routing");

timeout

request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒

request.timeout("1s"); // 方式2:1秒

版本冲突重试

如果更新文档的时候出现版本冲突,重试几次。

request.retryOnConflict(3);

并发控制

设置并发控制参数,新版的ES已经废弃老的 version 字段。

// 设置版本号
request.setIfSeqNo(2L); 
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);

执行请求

同步执行

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

异步执行

  • 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
            @Override
            public void onResponse(UpdateResponse updateResponse) {
              // 执行成功
            }

            @Override
            public void onFailure(Exception e) {
               // 执行失败
            }
        });

处理结果

// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();

if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 成功更新文档
}

Update By Query API(批量更新)

ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。

创建“UpdateByQueryRequest”

// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");

版本冲突

批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


版本冲突解决方案如下:

  • proceed:忽略版本冲突,继续执行
  • abort:遇到版本冲突,中断执行


request.setConflicts("proceed");

设置查询条件

  • (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));

限制更新文档数量

可以限制批量更新文档的数量。

request.setMaxDocs(10);

设置更新内容

Update by query api更新文档内容,仅支持通过【脚本方式】修改文档内容

request.setScript(
    new Script( // 创建inline脚本,使用painless语言。
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));

执行请求

BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

处理结果

TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

Delete By Query API(批量删除)

Java ES Delete By Query API 主要用于批量删除操作,支持设置查询条件

创建“DeleteByQueryRequest”

// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引
// 设置批量删除的索引名为:source1和source2
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");

版本冲突

批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。


版本冲突解决方案如下:

  • proceed:忽略版本冲突,继续执行
  • abort:遇到版本冲突,中断执行


request.setConflicts("proceed");

设置查询条件

  • (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));

限制删除文档数量

request.setMaxDocs(10);

执行请求

BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);

处理结果

TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

Multi-Get API(批量查询)

Multi-Get API 主要用于根据id集合批量查询文档内容,支持查询多个索引内容。

创建“MultiGetRequest”

MultiGetRequest request = new MultiGetRequest();

// 通过 MultiGetRequest.Item 对象设置查询参数
// 添加另外一组查询参数,索引名=index, 索引Id=12345
request.add(new MultiGetRequest.Item("index", "12345"));  // 文档id

// 添加另外一组查询参数,索引名=index, 索引Id=another_id
request.add(new MultiGetRequest.Item("index", "another_id"));

执行请求

MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

处理结果

  • 每个 MultiGetItemResponse 对象代表一个查询结果。
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
// ps: 通常是需要循环遍历response.getResponses返回的结果
MultiGetItemResponse firstItem = response.getResponses()[0];

assertNull(firstItem.getFailure());

GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();   // 获取索引名
String id = firstItem.getId();   // 获取文档Id
if (firstGet.isExists()) { // 检测文档是否存在
    // 获取版本号
    long version = firstGet.getVersion(); 
    
    // 查询文档内容,json字符串格式
    String sourceAsString = firstGet.getSourceAsString();      
    // 查询文档内容,Map对象格式  
    Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); 
} else {
    
}

Bulk API(批量操作)

ES 的 Bulk API 主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的 ES 请求。

创建“BulkRequest”

BulkRequest request = new BulkRequest();

添加操作对象

支持 index/update/delete 操作。

批量index操作

// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));

混合操作

批量执行文档的删除、更新、创建操作。

request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));

可选参数

timeout

request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟

执行请求

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

处理结果

if (bulkResponse.hasFailures()) { 
    // 至少存在一个错误处理
}

// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
    
    // 根据操作类型检测执行结果
    switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            // 处理创建请求
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            // 处理更新请求
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:  
            // 处理删除请求 
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}