“Java Elasticsearch:Document APIs”的版本间差异
小无编辑摘要 |
|||
第1行: | 第1行: | ||
[[category:ElasticSearch | [[category:Java&ElasticSearch]] | ||
== Index Api == | == Index Api == |
2023年3月31日 (五) 22:03的最新版本
Index Api
Java Elasticsearch Index API 主要用于插入或者更新文档数据。
创建“IndexRequest”
// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts");
// 设置文档ID
request.id("1");
设置文档内容
支持以JSON字符串方式或者map方式设置文档内容。
JSON字符串方式
// 创建 JSON字符串
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
// 设置文档内容
request.source(jsonString, XContentType.JSON);
map方式
// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 设置文档内容
request.source(jsonMap);
可选参数
routing
设置路由字段
request.routing("routing");
timeout
设置单个请求超时参数
request.timeout(TimeValue.timeValueSeconds(1));
// or
request.timeout("1s");
Version
设置文档版本
request.version(2);
操作类型
Index api支持两类操作:create 或者 index (默认)
request.opType("create");
执行请求
同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Get Api
Get Api 主要用于根据文档ID查询索引数据。
创建“GetRequest”
// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");
可选参数
是否返回文档内容
- 默认返回文档内容
// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
返回、过滤:指定字段
// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;
// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
routing
request.routing("routing");
Version
request.version(2);
执行请求
同步执行
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
String index = getResponse.getIndex();
String id = getResponse.getId();
// 检测索引是否存在
if (getResponse.isExists()) {
// 获取版本号
long version = getResponse.getVersion();
// 获取文档内容,json字符串形式
String sourceAsString = getResponse.getSourceAsString();
// 获取文档内容,map形式
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
// 获取文档内容,字节数组形式
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//
}
Delete API
Delete API 主要用于根据文档ID删除索引文档。
创建“DeleteRequest”
// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟
request.timeout("2m"); //格式2:2分钟
Version
request.version(2);
执行请求
同步执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
Update API
Elasticsearch Update API 根据文档ID更新文档内容。
主要支持两种方式更新文档内容:
- 通过脚本的方式更新;
- 更新文档部分字段。
- 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。
创建“UpdateRequest”
// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");
设置更新内容
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:
脚本方式
通过 ES 内置 script 脚本更新文档内容。
// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4);
// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
// 设置脚本
request.script(inline);
map方式
通过 map 对象 更新文档部分内容。
// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// 设置更新内容
request.doc(jsonMap);
json字符串方式
通过 json字符串 方式更新文档部分内容。
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
upsert方式
通过 Upsert 方式更新文档内容。
- 跟前面三种类似,支持json字符串、map、脚本方式;
- 但是有一点区别,如果被更新的文档不存在,则会执行插入操作。
如:(json字符串方式)
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒
request.timeout("1s"); // 方式2:1秒
版本冲突重试
如果更新文档的时候出现版本冲突,重试几次。
request.retryOnConflict(3);
并发控制
设置并发控制参数,新版的ES已经废弃老的 version 字段。
// 设置版本号
request.setIfSeqNo(2L);
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);
执行请求
同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// 执行成功
}
@Override
public void onFailure(Exception e) {
// 执行失败
}
});
处理结果
// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Update By Query API(批量更新)
ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。
创建“UpdateByQueryRequest”
// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
版本冲突
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。
版本冲突解决方案如下:
- proceed:忽略版本冲突,继续执行
- abort:遇到版本冲突,中断执行
request.setConflicts("proceed");
设置查询条件
- (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
限制更新文档数量
可以限制批量更新文档的数量。
request.setMaxDocs(10);
设置更新内容
Update by query api更新文档内容,仅支持通过【脚本方式】修改文档内容。
request.setScript(
new Script( // 创建inline脚本,使用painless语言。
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
执行请求
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
处理结果
TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
Delete By Query API(批量删除)
Java ES Delete By Query API 主要用于批量删除操作,支持设置查询条件。
创建“DeleteByQueryRequest”
// 创建 DeleteByQueryRequest 对象,支持同时操作多个索引
// 设置批量删除的索引名为:source1和source2
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
版本冲突
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。
版本冲突解决方案如下:
- proceed:忽略版本冲突,继续执行
- abort:遇到版本冲突,中断执行
request.setConflicts("proceed");
设置查询条件
- (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
限制删除文档数量
request.setMaxDocs(10);
执行请求
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
处理结果
TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
Multi-Get API(批量查询)
Multi-Get API 主要用于根据id集合,批量查询文档内容,支持查询多个索引内容。
创建“MultiGetRequest”
MultiGetRequest request = new MultiGetRequest();
// 通过 MultiGetRequest.Item 对象设置查询参数
// 添加另外一组查询参数,索引名=index, 索引Id=12345
request.add(new MultiGetRequest.Item("index", "12345")); // 文档id
// 添加另外一组查询参数,索引名=index, 索引Id=another_id
request.add(new MultiGetRequest.Item("index", "another_id"));
执行请求
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
处理结果
- 每个 MultiGetItemResponse 对象代表一个查询结果。
// response.getResponses 返回多个 MultiGetItemResponse 对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
// ps: 通常是需要循环遍历response.getResponses返回的结果
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure());
GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex(); // 获取索引名
String id = firstItem.getId(); // 获取文档Id
if (firstGet.isExists()) { // 检测文档是否存在
// 获取版本号
long version = firstGet.getVersion();
// 查询文档内容,json字符串格式
String sourceAsString = firstGet.getSourceAsString();
// 查询文档内容,Map对象格式
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
} else {
}
Bulk API(批量操作)
ES 的 Bulk API 主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的 ES 请求。
创建“BulkRequest”
BulkRequest request = new BulkRequest();
添加操作对象
支持 index/update/delete 操作。
批量index操作
// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));
混合操作
批量执行文档的删除、更新、创建操作。
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON,"field", "baz"));
可选参数
timeout
request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟
执行请求
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
处理结果
if (bulkResponse.hasFailures()) {
// 至少存在一个错误处理
}
// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
// 根据操作类型检测执行结果
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
// 处理创建请求
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
// 处理更新请求
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
// 处理删除请求
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}