Java Elasticsearch:Document APIs
跳到导航
跳到搜索
Index Api
Java Elasticsearch Index API 主要用于插入或者更新文档数据。
创建Index Request
// 创建Index Request,设置索引名为: posts
IndexRequest request = new IndexRequest("posts");
// 设置文档ID
request.id("1");
设置文档内容
支持以JSON字符串形式或者Map形式设置文档内容。
JSON字符串形式:
// 创建 JSON字符串
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
// 设置文档内容
request.source(jsonString, XContentType.JSON);
Map形式:
// 创建 Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 设置文档内容
request.source(jsonMap);
可选参数
routing
设置路由字段
request.routing("routing");
timeout
设置单个请求超时参数
request.timeout(TimeValue.timeValueSeconds(1));
// or
request.timeout("1s");
Version
设置文档版本
request.version(2);
操作类型
Index api支持两类操作:create 或者 index (默认)
request.opType("create");
执行请求
同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
异步执行
IndexResponse indexResponse = client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理请求结果
// 获取索引名
String index = indexResponse.getIndex();
// 获取文档ID
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Get Api
Get Api 主要用于根据文档ID查询索引数据。
创建Get Request
// 创建GetRequest,索引名=posts, 文档ID=1
GetRequest getRequest = new GetRequest("posts", "1");
可选参数
是否返回文档内容
- 默认返回文档内容
// 不返回文档内容
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
返回、过滤:指定字段
// 设置返回指定字段
String[] includes = new String[]{"message", "*Date"};
//String[] includes = Strings.EMPTY_ARRAY;
// 过滤指定字段
String[] excludes = new String[]{"message"};
//String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
routing
request.routing("routing");
Version
request.version(2);
执行请求
同步执行
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
处理请求结果
String index = getResponse.getIndex();
String id = getResponse.getId();
// 检测索引是否存在
if (getResponse.isExists()) {
// 获取版本号
long version = getResponse.getVersion();
// 获取文档内容,json字符串形式
String sourceAsString = getResponse.getSourceAsString();
// 获取文档内容,map形式
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
// 获取文档内容,字节数组形式
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//
}
Delete API
Delete API 主要用于根据文档ID删除索引文档。
创建Delete Request
// 设置索引名=posts, 文档id=1
DeleteRequest request = new DeleteRequest("posts", "1");
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟
request.timeout("2m"); //格式2:2分钟
Version
request.version(2);
执行请求
同步执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
// 请求成功回调函数
}
@Override
public void onFailure(Exception e) {
// 请求失败回调函数
}
});
Update API
Elasticsearch Update API 根据文档ID更新文档内容。
主要支持两种方式更新文档内容:
- 通过脚本的方式更新;
- 更新文档部分字段。
- 如果被更新的文档不存在,也支持通过 upsert api 实现插入操作。
创建Update Request
// 创建UpdateRequest请求,索引名=posts,文档ID=1
UpdateRequest request = new UpdateRequest("posts", "1");
设置更新内容
UpdateRequest 对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可:
脚本方式
通过 ES 内置 script 脚本更新文档内容。
// 定义脚本参数
Map<String, Object> parameters = singletonMap("count", 4);
// 创建inline脚本,使用painless语言,实现field字段 + count参数值
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
// 设置脚本
request.script(inline);
map方式
通过 map 对象 更新文档部分内容。
// 通过map对象设置需要更新的字段内容
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// 设置更新内容
request.doc(jsonMap);
json字符串方式
通过 json字符串 方式更新文档部分内容。
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
upsert方式
通过 Upsert 方式更新文档内容。
- 跟前面三种类似,支持json字符串、map、脚本方式;
- 但是有一点区别,如果被更新的文档不存在,则会执行插入操作。
如:(json字符串方式)
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
可选参数
routing
request.routing("routing");
timeout
request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒
request.timeout("1s"); // 方式2:1秒
版本冲突重试
如果更新文档的时候出现版本冲突,重试几次。
request.retryOnConflict(3);
并发控制
设置并发控制参数,新版的ES已经废弃老的 version 字段。
// 设置版本号
request.setIfSeqNo(2L);
// 设置文档所在的主分区
request.setIfPrimaryTerm(1L);
执行请求
同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
异步执行
- 异步执行通过回调方法进行后续操作,不需要通过“xxxResponse”获取返回结果。
client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// 执行成功
}
@Override
public void onFailure(Exception e) {
// 执行失败
}
});
处理请求结果
// 索引名
String index = updateResponse.getIndex();
// 文档id
String id = updateResponse.getId();
// 版本号
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 成功创建文档
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 成功更新文档
}
Update By Query API(批量更新)