Java Elasticsearch:Document APIs
Index Api
Java Elasticsearch Index API 主要用于插入或者更新文档数据。
创建“IndexRequest”
// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts");
// 设置文档ID
request.id("1");
设置文档内容
支持以JSON字符串方式或者map方式设置文档内容。
JSON字符串方式
// 创建 JSON字符串
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
// 设置文档内容
request.source(jsonString, XContentType.JSON);
map方式
// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 设置文档内容
request.source(jsonMap);
可选参数
routing
设置路由字段
request.routing("routing");
timeout
设置单个请求超时参数
request.timeout(TimeValue.timeValueSeconds(1));
// or
request.timeout("1s");
Version
设置文档版本
request.version(2);
操作类型
Index api支持两类操作:create 或者 index (默认)
request.opType("create");
执行请求
同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Get Api
Get Api 主要用于根据文档ID查询索引数据。
创建“GetRequest”
// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");
可选参数
是否返回文档内容
- 默认返回文档内容
// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
返回、过滤:指定字段
// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;
// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
routing
request.routing("routing");
Version
request.version(2);
执行请求
同步执行
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理结果
String index = getResponse.getIndex();
String id = getResponse.getId();
// 检测索引是否存在
if (getResponse.isExists()) {
// 获取版本号
long version = getResponse.getVersion();
// 获取文档内容,json字符串形式
String sourceAsString = getResponse.getSourceAsString();
// 获取文档内容,map形式
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
// 获取文档内容,字节数组形式
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//
}
Delete API
Delete API 主要用于根据文档ID删除索引文档。
创建“DeleteRequest”
// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟
request.timeout("2m"); //格式2:2分钟
Version
request.version(2);
执行请求
同步执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
Update API
Elasticsearch Update API 根据文档ID更新文档内容。
主要支持两种方式更新文档内容:
- 通过脚本的方式更新;
- 更新文档部分字段。
- 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。
创建“UpdateRequest”
// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");
设置更新内容
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:
脚本方式
通过 ES 内置 script 脚本更新文档内容。
// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4);
// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
// 设置脚本
request.script(inline);
map方式
通过 map 对象 更新文档部分内容。
// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// 设置更新内容
request.doc(jsonMap);
json字符串方式
通过 json字符串 方式更新文档部分内容。
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
upsert方式
通过 Upsert 方式更新文档内容。
- 跟前面三种类似,支持json字符串、map、脚本方式;
- 但是有一点区别,如果被更新的文档不存在,则会执行插入操作。
如:(json字符串方式)
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒
request.timeout("1s"); // 方式2:1秒
版本冲突重试
如果更新文档的时候出现版本冲突,重试几次。
request.retryOnConflict(3);
并发控制
设置并发控制参数,新版的ES已经废弃老的 version 字段。
// 设置版本号
request.setIfSeqNo(2L);
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);
执行请求
同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// 执行成功
}
@Override
public void onFailure(Exception e) {
// 执行失败
}
});
处理结果
// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Update By Query API(批量更新)
ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。
创建“UpdateByQueryRequest”
// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
版本冲突
批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。
版本冲突解决方案如下:
- proceed:忽略版本冲突,继续执行
- abort:遇到版本冲突,中断执行
request.setConflicts("proceed");
设置查询条件
- (ES的查询语法是非常丰富的,这里仅给出一种写法)
// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy"));
限制更新文档数量
可以限制批量更新文档的数量。
request.setMaxDocs(10);
设置更新内容
Update by query api更新文档内容,仅支持通过【脚本方式】修改文档内容。
request.setScript(
new Script( // 创建inline脚本,使用painless语言。
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
执行请求
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
处理结果
TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数
Delete By Query API(批量删除)