Java Elasticsearch:聚合查询(Aggregation)

来自Wikioe
跳到导航 跳到搜索


关于

Elasticsearch 中的聚合查询,类似 SQL 的 SUM/AVG/COUNT/GROUP BY 分组查询,主要用于统计分析场景。


示例:

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;

public class Main {
    public static void main(String[] args) throws IOException {
        // 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
        RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
                new HttpHost("localhost", 9201, "http"));

        // 创建RestHighLevelClient,请求都是通过RestHighLevelClient实例发出去的。
        RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);

        // 创建SearchRequest对象, 设置查询索引名=order
        SearchRequest searchRequest = new SearchRequest("order");
        // 通过SearchSourceBuilder构建搜索参数
        SearchSourceBuilder builder = new SearchSourceBuilder();
        // 通过QueryBuilders构建ES查询条件,这里查询所有文档。
        builder.query(QueryBuilders.matchAllQuery());

        // 创建terms桶聚合,聚合名字=by_shop, 字段=shop_id,根据shop_id分组
        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("by_shop").field("shop_id");
        // 嵌套聚合
        // 设置Avg指标聚合,聚合名字=avg_price, 字段=price,计算平均价格
        aggregationBuilder.subAggregation(AggregationBuilders.avg("avg_price").field("price"));

        // 设置聚合查询
        builder.aggregation(aggregationBuilder);
        // 设置搜索条件
        searchRequest.source(builder);
        // 如果只想返回聚合统计结果,不想返回查询结果可以将分页大小设置为0
        builder.size(0);

        // 执行ES请求
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        // 处理聚合查询结果
        Aggregations aggregations = searchResponse.getAggregations();
        // 根据by_shop名字查询terms聚合结果
        Terms byShopAggregation = aggregations.get("by_shop");

        // 遍历terms聚合结果
        for (Terms.Bucket bucket  : byShopAggregation.getBuckets()) {
            // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
            int shopId = bucket.getKeyAsNumber().intValue();

            // 根据avg_price聚合名字,获取嵌套聚合结果
            Avg avg = bucket.getAggregations().get("avg_price");
            // 获取平均价格
            double avgPrice = avg.getValue();
        }

        // 关闭ES Client
        client.close();
    }
}

等效SQL:

select shop_id, avg(price) as avg_price from order group by shop_id

桶聚合(bucket)

Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。


常用桶聚合:

  1. Terms聚合
  2. Histogram聚合
  3. Date histogram聚合
  4. Range聚合
  5. 嵌套聚合

Terms聚合

创建聚合条件:

// terms聚合命名为: by_shop, 分组字段为: shop_id
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop").field("shop_id");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据by_shop命名查询terms聚合结果
Terms byShopAggregation = aggregations.get("by_shop");

// 遍历terms聚合结果
for (Terms.Bucket bucket : byShopAggregation.getBuckets()) {
    // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    int shopId = bucket.getKeyAsNumber().intValue();
    // 如果分组的字段是字符串类型,可以直接转成String类型
    // String key = bucket.getKeyAsString();
    
    // 获取文档总数
    long count = bucket.getDocCount();
}

Histogram聚合

创建聚合条件:

// Histogram聚合命名为: prices
HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("prices")
                                                    .field("price") // 根据price字段值,对数据进行分组
                                                    .interval(100); //  分桶的间隔为100,意思就是price字段值按100间隔分组

处理聚合结果:

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据prices命名查询Histogram聚合结果
Histogram histogram = aggregations.get("prices");
        
// 遍历聚合结果
for (Histogram.Bucket bucket : histogram.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

Date histogram聚合

创建聚合条件:

// DateHistogram聚合命名为: sales_over_time
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sales_over_time")
                .field("date") // 根据date字段值,对数据进行分组
                .calendarInterval(DateHistogramInterval.MONTH) // 时间分组间隔:DateHistogramInterval.* 序列常量,支持每月,每年,每天等等时间间隔
                .format("yyyy-MM-dd"); // 设置返回结果中桶key的时间格式

处理聚合结果:

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据sales_over_time命名查询Histogram聚合结果
Histogram histogram = aggregations.get("sales_over_time");
        
// 遍历聚合结果
for (Histogram.Bucket bucket : histogram.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

Range聚合

创建聚合条件:

//range聚合命名为: price_ranges
RangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range("price_ranges")
                                               .field("price") // 根据price字段分桶
                                               .addUnboundedFrom(100) // 范围配置, 0 - 100
                                               .addRange(100.0, 200.0) // 范围配置, 100 - 200
                                               .addUnboundedTo(200.0); // 范围配置,> 200的值

处理聚合结果:

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("price_ranges");

// 遍历聚合结果
for (Range.Bucket bucket : range.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

嵌套聚合

桶聚合之间支持互相嵌套,同时桶聚合也可以嵌套多个指标聚合。


创建嵌套聚合条件:【“桶聚合”嵌套两个“指标聚合”】

// 创建Terms指标聚合
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop").field("shop_id");

// 创建avg指标聚合
AvgAggregationBuilder avgPriceBuilder = AggregationBuilders.avg("avg_price").field("price");
// 设置嵌套聚合查询条件
byShop.subAggregation(avgPriceBuilder);

// 创建sum指标聚合
SumAggregationBuilder sumPriceBulder = AggregationBuilders.sum("sum_price").field("price");
// 设置嵌套聚合查询条件
byShop.subAggregation(sumPriceBulder);

处理聚合结果:

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("by_shop");

// 遍历聚合结果
for (Terms.Bucket bucket : terms.getBuckets()) {
      // 获取桶的Key值
      String key = bucket.getKeyAsString();
      // 获取文档总数
      long count = bucket.getDocCount();

      // 处理嵌套聚合结果
      Aggregations subAggregations = bucket.getAggregations();
      // 根据 avg_price 命名,查询 avg 聚合结果
      Avg avgPriceResult = subAggregations.get("avg_price");
      // 获取平均价格
      double avgPrice = avgPriceResult.getValue();

      // 根据 sum_price 命名,查询 sum 聚合结果
      Sum sumPriceResult = subAggregations.get("sum_price");
      // 获取总价格
      double sumPrice = sumPriceResult.getValue();
}

==示例==

// 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
                new HttpHost("localhost", 9201, "http"));

// 创建RestHighLevelClient,请求都是通过RestHighLevelClient实例发出去的。
RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);

// 创建SearchRequest对象, 设置索引名=order
SearchRequest searchRequest = new SearchRequest("order");
// 通过SearchSourceBuilder构建搜索参数
SearchSourceBuilder builder = new SearchSourceBuilder();
// 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
builder.query(QueryBuilders.matchAllQuery());

// 创建Terms桶聚合条件
// terms聚合命名为: by_shop, 分组字段为: shop_id
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop").field("shop_id");

// 设置聚合条件
builder.aggregation(byShop);

// 设置搜索条件
searchRequest.source(builder);

// 执行ES请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据by_shop名字查询terms聚合结果
Terms byShopAggregation = aggregations.get("by_shop");

// 遍历terms聚合结果
for (Terms.Bucket bucket  : byShopAggregation.getBuckets()) {
    // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    int shopId = bucket.getKeyAsNumber().intValue();
    // 如果分组的字段是字符串类型,可以直接转成String类型
    // String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

指标聚合(metrics)

Elasticsearch 指标聚合,就是类似 SQL 的统计函数,指标聚合可以单独使用,也可以跟桶聚合一起使用。


常用指标聚合:

  1. Value Count
  2. Cardinality
  3. Avg
  4. Sum
  5. Max
  6. Min

Value Count

值聚合,主要用于统计文档总数,类似 SQL 的 count 函数。


创建聚合条件:

// 创建Value Count指标聚合
// 聚合统计命名为:orders, 统计order_id字段值的数量
ValueCountAggregationBuilder valueCountAggregationBuilder = AggregationBuilders.count("orders").field("order_id");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据orders命名查询,ValueCount统计结果
ValueCount valueCount = aggregations.get("orders");
// 打印结果
System.out.println(valueCount.getValue());

Cardinality

基数聚合,也是用于统计文档的总数,跟 Value Count 的区别是,基数聚合会去重,不会统计重复的值,类似 SQL 的 count(DISTINCT 字段) 用法。

  • 基数聚合是一种近似算法,统计的结果会有一定误差,不过性能很好。


创建聚合条件:

// 聚合统计命名为:total, 近似统计id字段值的数量
CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("total").field("id");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据total命名查询,Cardinality统计结果
Cardinality cardinality = aggregations.get("total");
// 打印结果
System.out.println(cardinality.getValue());

Avg

创建聚合条件:

AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据total命名查询,Avg统计结果
Avg avgPrice = aggregations.get("avg_price");
// 打印结果
System.out.println(avgPrice.getValue());

Sum

创建聚合条件:

SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("total_sale").field("price");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据total命名查询,Sum统计结果
Sum totalPrice = aggregations.get("total_sale");
// 打印结果
System.out.println(totalPrice.getValue());

Max

创建聚合条件:

MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("max_price").field("price");

处理聚合结果:

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据max_price命名查询,Max统计结果
Max maxPrice = aggregations.get("max_price");
// 打印结果
System.out.println(maxPrice.getValue());

Min

创建聚合条件:

MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_price").field("price");

处理聚合结果:

Aggregations aggregations = searchResponse.getAggregations();
// 根据min_price命名查询,Min统计结果
Min minPrice = aggregations.get("min_price");
// 打印结果
System.out.println(minPrice.getValue());

==示例==

// 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
                new HttpHost("localhost", 9201, "http"));

// 创建RestHighLevelClient,�请求都是通过RestHighLevelClient实例发出去的。
        RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);

// 创建SearchRequest对象, 索引名=order
SearchRequest searchRequest = new SearchRequest("order");
// 通过SearchSourceBuilder构建搜索参数
SearchSourceBuilder builder = new SearchSourceBuilder();
// 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
builder.query(QueryBuilders.matchAllQuery());

// 创建Value Count指标聚合
// 聚合统计命名为:orders, 统计order_id字段值的数量
ValueCountAggregationBuilder valueCountAggregationBuilder = AggregationBuilders.count("orders")
                .field("order_id");

// 创建Sum指标聚合
// 聚合统计命名为:total_sale, 统计price字段值的总和
SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("total_sale")
                .field("price");

// 设置聚合查询,可以设置多个聚合查询条件,只要聚合查询命名不同就行
builder.aggregation(valueCountAggregationBuilder);
builder.aggregation(sumAggregationBuilder);

// 设置搜索条件
searchRequest.source(builder);

// 执行ES请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);


// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据orders命名查询,ValueCount统计结果
ValueCount valueCount = aggregations.get("orders");
System.out.println(valueCount.getValue());

// 根据total_sale命名查询,Sum统计结果
Sum sum = aggregations.get("total_sale");
System.out.println(sum.getValue());