“Java Elasticsearch:Document APIs”的版本间差异
(建立内容为“category:ElasticSearch category:Java == Index Api == Java Elasticsearch Index API 主要用于插入或者更新文档数据。 === 创建Index Request…”的新页面) |
小无编辑摘要 |
||
(未显示同一用户的15个中间版本) | |||
第1行: | 第1行: | ||
[[category:ElasticSearch | [[category:Java&ElasticSearch]] | ||
== Index Api == | == Index Api == | ||
Java Elasticsearch Index API | Java Elasticsearch Index API 主要用于'''插入'''或者'''更新'''文档数据。 | ||
=== | === 创建“IndexRequest” === | ||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建Index Request,设置索引名为: posts | // 创建Index Request,设置索引名为: posts | ||
第15行: | 第14行: | ||
=== 设置文档内容 === | === 设置文档内容 === | ||
支持以''' | 支持以'''JSON字符串'''方式或者'''map'''方式设置文档内容。 | ||
==== JSON字符串方式 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建 JSON字符串 | // 创建 JSON字符串 | ||
第31行: | 第29行: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==== map方式 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建 Map | // 创建 Map | ||
第79行: | 第76行: | ||
==== 异步执行 ==== | ==== 异步执行 ==== | ||
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() { | |||
@Override | @Override | ||
public void onResponse(IndexResponse indexResponse) { | public void onResponse(IndexResponse indexResponse) { | ||
第93行: | 第92行: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
=== | === 处理结果 === | ||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 获取索引名 | // 获取索引名 | ||
第108行: | 第107行: | ||
== Get Api == | == Get Api == | ||
Get Api 主要用于根据文档ID'''查询'''索引数据。 | |||
=== 创建“GetRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建GetRequest,索引名=posts, 文档ID=1 | |||
GetRequest getRequest = new GetRequest("posts", "1"); | |||
</syntaxhighlight> | |||
=== 可选参数 === | |||
==== 是否返回文档内容 ==== | |||
* 默认返回文档内容 | |||
<syntaxhighlight lang="Java" highlight=""> | |||
// 不返回文档内容 | |||
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 返回、过滤:指定字段 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
// 设置返回指定字段 | |||
String[] includes = new String[]{"message", "*Date"}; | |||
//String[] includes = Strings.EMPTY_ARRAY; | |||
// 过滤指定字段 | |||
String[] excludes = new String[]{"message"}; | |||
//String[] excludes = Strings.EMPTY_ARRAY; | |||
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); | |||
request.fetchSourceContext(fetchSourceContext); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== routing ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.routing("routing"); | |||
</syntaxhighlight> | |||
==== Version ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.version(2); | |||
</syntaxhighlight> | |||
=== 执行请求 === | |||
==== 同步执行 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 异步执行 ==== | |||
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() { | |||
@Override | |||
public void onResponse(GetResponse getResponse) { | |||
// 请求成功回调函数 | |||
} | |||
@Override | |||
public void onFailure(Exception e) { | |||
// 请求失败回调函数 | |||
} | |||
}); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 处理结果 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
String index = getResponse.getIndex(); | |||
String id = getResponse.getId(); | |||
// 检测索引是否存在 | |||
if (getResponse.isExists()) { | |||
// 获取版本号 | |||
long version = getResponse.getVersion(); | |||
// 获取文档内容,json字符串形式 | |||
String sourceAsString = getResponse.getSourceAsString(); | |||
// 获取文档内容,map形式 | |||
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); | |||
// 获取文档内容,字节数组形式 | |||
byte[] sourceAsBytes = getResponse.getSourceAsBytes(); | |||
} else { | |||
// | |||
} | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Delete API == | |||
Delete API 主要用于根据文档ID'''删除'''索引文档。 | |||
=== 创建“DeleteRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 设置索引名=posts, 文档id=1 | |||
DeleteRequest request = new DeleteRequest("posts", "1"); | |||
</syntaxhighlight> | |||
=== 可选参数 === | |||
==== routing ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.routing("routing"); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== timeout ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟 | |||
request.timeout("2m"); //格式2:2分钟 | |||
</syntaxhighlight> | |||
==== Version ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.version(2); | |||
</syntaxhighlight> | |||
=== 执行请求 === | |||
==== 同步执行 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 异步执行 ==== | |||
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() { | |||
@Override | |||
public void onResponse(DeleteResponse deleteResponse) { | |||
// 请求成功回调函数 | |||
} | |||
@Override | |||
public void onFailure(Exception e) { | |||
// 请求失败回调函数 | |||
} | |||
}); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Update API == | |||
Elasticsearch Update API 根据文档ID'''更新'''文档内容。 | |||
主要支持两种方式更新文档内容: | |||
# '''通过脚本的方式更新'''; | |||
# '''更新文档部分字段'''。 | |||
* 如果被更新的文档不存在,也支持通过 '''upsert''' api 实现'''插入'''操作。 | |||
=== 创建“UpdateRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建UpdateRequest请求,索引名=posts,文档ID=1 | |||
UpdateRequest request = new UpdateRequest("posts", "1"); | |||
</syntaxhighlight> | |||
=== 设置更新内容 === | |||
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可: | |||
==== 脚本方式 ==== | |||
通过 ES 内置 '''script''' 脚本更新文档内容。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 定义脚本参数 | |||
Map<String, Object> parameters = singletonMap("count", 4); | |||
// 创建inline脚本,使用painless语言,实现field字段 + count参数值 | |||
Script inline = new Script(ScriptType.INLINE, "painless", | |||
"ctx._source.field += params.count", parameters); | |||
// 设置脚本 | |||
request.script(inline); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== map方式 ==== | |||
通过 '''map 对象''' 更新文档部分内容。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 通过map对象设置需要更新的字段内容 | |||
Map<String, Object> jsonMap = new HashMap<>(); | |||
jsonMap.put("updated", new Date()); | |||
jsonMap.put("reason", "daily update"); | |||
// 设置更新内容 | |||
request.doc(jsonMap); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== json字符串方式 ==== | |||
通过 '''json字符串''' 方式更新文档部分内容。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
String jsonString = "{" + | |||
"\"updated\":\"2017-01-01\"," + | |||
"\"reason\":\"daily update\"" + | |||
"}"; | |||
request.doc(jsonString, XContentType.JSON); | |||
</syntaxhighlight> | |||
==== upsert方式 ==== | |||
通过 '''Upsert''' 方式更新文档内容。 | |||
* 跟前面三种类似,'''支持json字符串、map、脚本方式'''; | |||
* 但是有一点区别,'''如果被更新的文档不存在,则会执行插入操作'''。 | |||
如:(json字符串方式) | |||
<syntaxhighlight lang="Java" highlight=""> | |||
String jsonString = "{\"created\":\"2017-01-01\"}"; | |||
request.upsert(jsonString, XContentType.JSON); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 可选参数 === | |||
==== routing ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.routing("routing"); | |||
</syntaxhighlight> | |||
==== timeout ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒 | |||
request.timeout("1s"); // 方式2:1秒 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 版本冲突重试 ==== | |||
如果更新文档的时候出现版本冲突,重试几次。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.retryOnConflict(3); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 并发控制 ==== | |||
设置并发控制参数,新版的ES已经废弃老的 version 字段。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 设置版本号 | |||
request.setIfSeqNo(2L); | |||
// 设置文档所在的主分区 | |||
request.setIfPrimaryTerm(1L); | |||
</syntaxhighlight> | |||
=== 执行请求 === | |||
==== 同步执行 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 异步执行 ==== | |||
* 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() { | |||
@Override | |||
public void onResponse(UpdateResponse updateResponse) { | |||
// 执行成功 | |||
} | |||
@Override | |||
public void onFailure(Exception e) { | |||
// 执行失败 | |||
} | |||
}); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 处理结果 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 索引名 | |||
String index = updateResponse.getIndex(); | |||
// 文档id | |||
String id = updateResponse.getId(); | |||
// 版本号 | |||
long version = updateResponse.getVersion(); | |||
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { | |||
// 成功创建文档 | |||
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { | |||
// 成功更新文档 | |||
} | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Update By Query API(批量更新) == | |||
ES update by query api主要用于'''批量更新'''文档内容,支持'''设置查询条件限制更新'''文档的范围。 | |||
=== 创建“UpdateByQueryRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引 | |||
// 同时更新source1和source2索引内容 | |||
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); | |||
</syntaxhighlight> | |||
=== 版本冲突 === | |||
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。 | |||
版本冲突解决方案如下: | |||
* '''proceed''':忽略版本冲突,继续执行 | |||
* '''abort''':遇到版本冲突,中断执行 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.setConflicts("proceed"); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 设置查询条件 === | |||
* (ES的查询语法是非常丰富的,这里仅给出一种写法) | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 设置term查询条件,查询user字段=kimchy的文档内容 | |||
request.setQuery(new TermQueryBuilder("user", "kimchy")); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 限制更新文档数量 === | |||
可以限制批量更新文档的数量。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.setMaxDocs(10); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 设置更新内容 === | |||
Update by query api更新文档内容,'''仅支持通过【脚本方式】修改文档内容'''。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.setScript( | |||
new Script( // 创建inline脚本,使用painless语言。 | |||
ScriptType.INLINE, "painless", | |||
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", | |||
Collections.emptyMap())); | |||
</syntaxhighlight> | |||
=== 执行请求 === | |||
<syntaxhighlight lang="Java" highlight=""> | |||
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 处理结果 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间 | |||
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时 | |||
long totalDocs = bulkResponse.getTotal(); // 更新文档总数 | |||
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档 | |||
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档 | |||
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Delete By Query API(批量删除) == | |||
Java ES Delete By Query API 主要用于'''批量删除'''操作,支持'''设置查询条件'''。 | |||
=== 创建“DeleteByQueryRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引 | |||
// 设置批量删除的索引名为:source1和source2 | |||
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 版本冲突 === | |||
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。 | |||
版本冲突解决方案如下: | |||
* '''proceed''':忽略版本冲突,继续执行 | |||
* '''abort''':遇到版本冲突,中断执行 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.setConflicts("proceed"); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 设置查询条件 === | |||
* (ES的查询语法是非常丰富的,这里仅给出一种写法) | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 设置term查询条件,查询user字段=kimchy的文档内容 | |||
request.setQuery(new TermQueryBuilder("user", "kimchy")); | |||
</syntaxhighlight> | |||
=== 限制删除文档数量 === | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.setMaxDocs(10); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 执行请求 === | |||
<syntaxhighlight lang="Java" highlight=""> | |||
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | |||
=== 处理结果 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间 | |||
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时 | |||
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数 | |||
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量 | |||
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Multi-Get API(批量查询) == | |||
Multi-Get API 主要用于'''根据id集合''','''批量查询'''文档内容,支持查询多个索引内容。 | |||
=== 创建“MultiGetRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
MultiGetRequest request = new MultiGetRequest(); | |||
// 通过 MultiGetRequest.Item 对象设置查询参数 | |||
// 添加另外一组查询参数,索引名=index, 索引Id=12345 | |||
request.add(new MultiGetRequest.Item("index", "12345")); // 文档id | |||
// 添加另外一组查询参数,索引名=index, 索引Id=another_id | |||
request.add(new MultiGetRequest.Item("index", "another_id")); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 执行请求 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 处理结果 === | |||
* 每个 '''MultiGetItemResponse''' 对象代表一个查询结果。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。 | |||
// ps: 通常是需要循环遍历response.getResponses返回的结果 | |||
MultiGetItemResponse firstItem = response.getResponses()[0]; | |||
assertNull(firstItem.getFailure()); | |||
GetResponse firstGet = firstItem.getResponse(); | |||
String index = firstItem.getIndex(); // 获取索引名 | |||
String id = firstItem.getId(); // 获取文档Id | |||
if (firstGet.isExists()) { // 检测文档是否存在 | |||
// 获取版本号 | |||
long version = firstGet.getVersion(); | |||
// 查询文档内容,json字符串格式 | |||
String sourceAsString = firstGet.getSourceAsString(); | |||
// 查询文档内容,Map对象格式 | |||
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); | |||
} else { | |||
} | |||
</syntaxhighlight> | </syntaxhighlight> | ||
== Bulk API(批量操作) == | |||
ES 的 Bulk API 主要用于在'''单个请求中,批量执行创建、更新、删除文档操作''',避免循环发送大量的 ES 请求。 | |||
=== 创建“BulkRequest” === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
BulkRequest request = new BulkRequest(); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 添加操作对象 === | |||
支持 '''index'''/'''update'''/'''delete''' 操作。 | |||
==== 批量index操作 ==== | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档 | |||
// ps: IndexRequest对象,以键值对的方式设置文档内容 | |||
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo")); | |||
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar")); | |||
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz")); | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== 混合操作 ==== | |||
批量执行文档的删除、更新、创建操作。 | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
request.add(new DeleteRequest("posts", "3")); | |||
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test")); | |||
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz")); | |||
</syntaxhighlight> | |||
=== 可选参数 === | |||
==== timeout ==== | |||
<syntaxhighlight lang="Java" highlight=""> | |||
request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟 | |||
request.timeout("2m"); // 形式2:2分钟 | |||
</syntaxhighlight> | </syntaxhighlight> | ||
=== 执行请求 === | |||
<syntaxhighlight lang="Java" highlight=""> | |||
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); | |||
</syntaxhighlight> | |||
=== 处理结果 === | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
if (bulkResponse.hasFailures()) { | |||
// 至少存在一个错误处理 | |||
} | |||
// 循环检测批量操作结果 | |||
for (BulkItemResponse bulkItemResponse : bulkResponse) { | |||
DocWriteResponse itemResponse = bulkItemResponse.getResponse(); | |||
// 根据操作类型检测执行结果 | |||
switch (bulkItemResponse.getOpType()) { | |||
case INDEX: | |||
case CREATE: | |||
// 处理创建请求 | |||
IndexResponse indexResponse = (IndexResponse) itemResponse; | |||
break; | |||
case UPDATE: | |||
// 处理更新请求 | |||
UpdateResponse updateResponse = (UpdateResponse) itemResponse; | |||
break; | |||
case DELETE: | |||
// 处理删除请求 | |||
DeleteResponse deleteResponse = (DeleteResponse) itemResponse; | |||
} | |||
} | |||
</syntaxhighlight> | </syntaxhighlight> |
2023年3月31日 (五) 22:03的最新版本
Index Api
Java Elasticsearch Index API 主要用于插入或者更新文档数据。
创建“IndexRequest”
// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts");
// 设置文档ID
request.id("1");
设置文档内容
支持以JSON字符串方式或者map方式设置文档内容。
JSON字符串方式
// 创建 JSON字符串
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
// 设置文档内容
request.source(jsonString, XContentType.JSON);
map方式
// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 设置文档内容
request.source(jsonMap);
可选参数
routing
设置路由字段
request.routing("routing");
timeout
设置单个请求超时参数
request.timeout(TimeValue.timeValueSeconds(1));
// or
request.timeout("1s");
Version
设置文档版本
request.version(2);
操作类型
Index api支持两类操作:create 或者 index (默认)
request.opType("create");
执行请求
同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Get Api
Get Api 主要用于根据文档ID查询索引数据。
创建“GetRequest”
// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");
可选参数
是否返回文档内容
- 默认返回文档内容
// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
返回、过滤:指定字段
// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;
// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
routing
request.routing("routing");
Version
request.version(2);
执行请求
同步执行
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
String index = getResponse.getIndex();
String id = getResponse.getId();
// 检测索引是否存在
if (getResponse.isExists()) {
// 获取版本号
long version = getResponse.getVersion();
// 获取文档内容,json字符串形式
String sourceAsString = getResponse.getSourceAsString();
// 获取文档内容,map形式
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
// 获取文档内容,字节数组形式
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//
}
Delete API
Delete API 主要用于根据文档ID删除索引文档。
创建“DeleteRequest”
// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟
request.timeout("2m"); //格式2:2分钟
Version
request.version(2);
执行请求
同步执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
Update API
Elasticsearch Update API 根据文档ID更新文档内容。
主要支持两种方式更新文档内容:
- 通过脚本的方式更新;
- 更新文档部分字段。
- 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。
创建“UpdateRequest”
// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");
设置更新内容
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:
脚本方式
通过 ES 内置 script 脚本更新文档内容。
// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4);
// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
// 设置脚本
request.script(inline);
map方式
通过 map 对象 更新文档部分内容。
// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// 设置更新内容
request.doc(jsonMap);
json字符串方式
通过 json字符串 方式更新文档部分内容。
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
upsert方式
通过 Upsert 方式更新文档内容。
- 跟前面三种类似,支持json字符串、map、脚本方式;
- 但是有一点区别,如果被更新的文档不存在,则会执行插入操作。
如:(json字符串方式)
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒
request.timeout("1s"); // 方式2:1秒
版本冲突重试
如果更新文档的时候出现版本冲突,重试几次。
request.retryOnConflict(3);
并发控制
设置并发控制参数,新版的ES已经废弃老的 version 字段。
// 设置版本号
request.setIfSeqNo(2L);
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);
执行请求
同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// 执行成功
}
@Override
public void onFailure(Exception e) {
// 执行失败
}
});
处理结果
// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Update By Query API(批量更新)
ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。
创建“UpdateByQueryRequest”
// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
版本冲突
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。
版本冲突解决方案如下:
- proceed:忽略版本冲突,继续执行
- abort:遇到版本冲突,中断执行
request.setConflicts("proceed");
设置查询条件
- (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
限制更新文档数量
可以限制批量更新文档的数量。
request.setMaxDocs(10);
设置更新内容
Update by query api更新文档内容,仅支持通过【脚本方式】修改文档内容。
request.setScript(
new Script( // 创建inline脚本,使用painless语言。
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
执行请求
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
处理结果
TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
Delete By Query API(批量删除)
Java ES Delete By Query API 主要用于批量删除操作,支持设置查询条件。
创建“DeleteByQueryRequest”
// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引
// 设置批量删除的索引名为:source1和source2
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
版本冲突
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。
版本冲突解决方案如下:
- proceed:忽略版本冲突,继续执行
- abort:遇到版本冲突,中断执行
request.setConflicts("proceed");
设置查询条件
- (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
限制删除文档数量
request.setMaxDocs(10);
执行请求
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
处理结果
TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
Multi-Get API(批量查询)
Multi-Get API 主要用于根据id集合,批量查询文档内容,支持查询多个索引内容。
创建“MultiGetRequest”
MultiGetRequest request = new MultiGetRequest();
// 通过 MultiGetRequest.Item 对象设置查询参数
// 添加另外一组查询参数,索引名=index, 索引Id=12345
request.add(new MultiGetRequest.Item("index", "12345")); // 文档id
// 添加另外一组查询参数,索引名=index, 索引Id=another_id
request.add(new MultiGetRequest.Item("index", "another_id"));
执行请求
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
处理结果
- 每个 MultiGetItemResponse 对象代表一个查询结果。
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
// ps: 通常是需要循环遍历response.getResponses返回的结果
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure());
GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex(); // 获取索引名
String id = firstItem.getId(); // 获取文档Id
if (firstGet.isExists()) { // 检测文档是否存在
// 获取版本号
long version = firstGet.getVersion();
// 查询文档内容,json字符串格式
String sourceAsString = firstGet.getSourceAsString();
// 查询文档内容,Map对象格式
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
} else {
}
Bulk API(批量操作)
ES 的 Bulk API 主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的 ES 请求。
创建“BulkRequest”
BulkRequest request = new BulkRequest();
添加操作对象
支持 index/update/delete 操作。
批量index操作
// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));
混合操作
批量执行文档的删除、更新、创建操作。
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));
可选参数
timeout
request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟
执行请求
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
处理结果
if (bulkResponse.hasFailures()) {
// 至少存在一个错误处理
}
// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
// 根据操作类型检测执行结果
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
// 处理创建请求
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
// 处理更新请求
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
// 处理删除请求
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}