ES常用API示例

Published on 2021-07-20 16:23 in 分类: 博客 with 狂盗一枝梅
分类: 博客

以下示例基于elasticsearch 5.3.0

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.3.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty4-client</artifactId>
    <version>5.3.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.3.0</version>
</dependency>

一、配置和连接

配置

elasticsearch:
  cluster-name: my-eshop
  servers:
    - 127.0.0.1:9300
    - 127.0.0.1:9301
  xpack:
    enable: false
    username: elastic
    password: changeme
@Bean
public TransportClient transportClient(ElasticSearchProperties elasticSearchProperties) {
    log.info(elasticSearchProperties.toString());
    XPackProperties xpack = elasticSearchProperties.getXpack();
    TransportClient client;
    if(xpack.getEnable()){
        client = new PreBuiltXPackTransportClient(Settings.builder()
                                                  .put("cluster.name", elasticSearchProperties.getClusterName())
                                                  .put("xpack.security.user", elasticSearchProperties.getXpack().getUsername() + ":" + elasticSearchProperties.getXpack().getPassword())
                                                  .build());
    }else{
        Settings settings = Settings.builder()
            .put("cluster.name", elasticSearchProperties.getClusterName())
            .build();
        client = new PreBuiltTransportClient(settings);
    }
    if(CollectionUtils.isEmpty(elasticSearchProperties.getServers())){
        throw new RuntimeException("ES集群配置不可为空");
    }
    TransportClient finalClient = client;
    elasticSearchProperties.getServers().forEach(item->{
        try {
            String[] split = item.split(":");
            finalClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.parseInt(split[1])));
        } catch (UnknownHostException e) {
            log.error("",e);
        }
    });
    return finalClient;
}
@Data
@Component
@ConfigurationProperties(prefix= "elasticsearch")
public class ElasticSearchProperties {
    private String clusterName;
    private List<String> servers;
    private XPackProperties xpack;
}
@Data
@Component
@ConfigurationProperties(prefix = "elasticsearch.xpack")
public class XPackProperties {
    private String username;
    private String password;
    private Boolean enable = false;
}

二、序列化、反序列化工具类

在正式开始前,需要先创建好序列化、反序列化工具类

import cn.hutool.core.date.DatePattern;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @author kdyzm
 * @date 2021/6/21
 */
public class ObjectMapperFactory {

    public static ObjectMapper getObjectMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        JavaTimeModule timeModule = new JavaTimeModule();
        timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
        timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
        objectMapper.registerModule(timeModule);
        return objectMapper;
    }

    /**
     * Jackson序列化需要的类
     */
    static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
        @Override
        public void serialize(LocalDateTime localDateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
            jsonGenerator.writeString(localDateTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
        }
    }

    /**
     * Jackson反序列化需要的类
     */
    static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
        @Override
        public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
            String timestamp = jsonParser.getValueAsString();
            return LocalDateTime.parse(timestamp, DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN));
        }
    }
}

三、API-数据保存

1. 保存一条数据

try {
    transportClient.prepareIndex("test-index", "test")
        .setSource(ObjectMapperFactory.getObjectMapper().writeValueAsBytes(entity), XContentType.JSON)
        .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
        .get();
} catch (JsonProcessingException e) {
    log.error("", e);
}

2. 批量保存数据

BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
entities.forEach(item -> {
    try {
        bulkRequestBuilder.add(transportClient.prepareIndex("test-index", "test")
                               .setSource(objectMapper.writeValueAsBytes(item), XContentType.JSON));
    } catch (JsonProcessingException e) {
        log.error("", e);
    }
});
bulkRequestBuilder.execute().actionGet();

四、API-数据删除

1.删除一条数据

DeleteByQueryAction.INSTANCE
        .newRequestBuilder(transportClient)
        .filter(QueryBuilders.termQuery("code", code))
        .source("test-index")
        .get();

2.批量删除数据

public void deleteBatch(List<String> codes) {
    DeleteByQueryAction.INSTANCE
        .newRequestBuilder(transportClient)
        .filter(QueryBuilders.termsQuery("code", codes))
        .source(EsIndexEnum.RLZY_SERVICE.indexName())
        .get();
}

五、API-数据更新

1.更新一条数据

transportClient.prepareUpdate()
        .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
        .setIndex(EsIndexEnum.RLZY_SERVICE.indexName())
        .setType("data")
        .setId(id)
        .setDoc(objectMapper.writeValueAsBytes(oldValue), XContentType.JSON)
        .get();

2.批量更新某个字段

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
updateByQuery.source("test-index")
    .filter(QueryBuilders.termsQuery("code", codes))
    .script(new Script("ctx._source['show']='" + false + "'"));
BulkByScrollResponse response = updateByQuery.get();
long updated = response.getUpdated();
log.info("批量修改了{}条数据", updated);

这种script修改字段值的方式如果比较频繁,会报错

Caused by: org.elasticsearch.common.breaker.CircuitBreakingException: [script] Too many dynamic script compilations within one minute, max: [15/min]; please use on-disk, indexed, or scripts with parameters instead; this limit can be changed by the [script.max_compilations_per_minute] setting
	at org.elasticsearch.script.ScriptService.checkCompilationLimit(ScriptService.java:351)
	at org.elasticsearch.script.ScriptService.compile(ScriptService.java:304)
	... 15 common frames omitted

需要修改script.max_compilations_per_minute的大小

PUT _cluster/settings
{
    "transient" : {
        "script.max_compilations_rate" : "100/1m"
    }
}

六、API-数据查询

1.查询一条数据

public T queryOneByBaseId(String code) {
    QueryBuilder termQuery = QueryBuilders.termQuery("code", code);
    SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("test_index")
        .setQuery(termQuery)
        .setFetchSource(true);
    SearchResponse searchResponse = searchRequestBuilder
        .execute()
        .actionGet();
    SearchHits hits = searchResponse.getHits();
    if (hits.getTotalHits() <= 0) {
        log.info("未查询到 code={} 的记录", baseId);
        return null;
    }
    if (hits.getTotalHits() > 1) {
        throw new RuntimeException("code=" + baseId + " 存在多条记录");
    }
    ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
    SearchHit data = hits.getAt(0);
    String sourceAsString = data.getSourceAsString();
    try {
        return objectMapper.readValue(sourceAsString, Class<T>);
    } catch (JsonProcessingException e) {
        log.error("", e);
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}

2.多条件查询

public List<T> queryByCondition(int pageNum, int pageSize, boolean showAll, T req) {
        BoolQueryBuilder mixQuery = QueryBuilders.boolQuery();
        boolean queryAll = true;
        if (!StringUtils.isEmpty(req.getTitle())) {
            MultiMatchQueryBuilder query = QueryBuilders
                    .multiMatchQuery(req.getTitle(), "title");
            mixQuery.must(query);
            queryAll = false;
        }
        if (!showAll) {
            TermQueryBuilder query = QueryBuilders
                    .termQuery(req.getShow().toString(), Boolean.TRUE);
            mixQuery.must(query);
            queryAll = false;
        }
        FieldSortBuilder sortBuilder = null;
        if (queryAll) {
            MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
            mixQuery.must(matchAllQueryBuilder);
            sortBuilder = SortBuilders.fieldSort("create_date").order(SortOrder.DESC).unmappedType("create_date");
        }
        SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("test_index")
                .setQuery(mixQuery)
                .setFrom((pageNum - 1) * pageSize)
                .setSize(pageSize)
                .setFetchSource(true);
        if (Objects.nonNull(sortBuilder)) {
            searchRequestBuilder.addSort(sortBuilder);
        }
        SearchResponse searchResponse = searchRequestBuilder
                .execute()
                .actionGet();
        SearchHits hits = searchResponse.getHits();
        List<T> data = new ArrayList<>();
        ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
        if (hits.getTotalHits() <= 0) {
            return new ArrayList<>();
        }
        hits.forEach(item -> {
            T entity = null;
            try {
                entity = objectMapper.readValue(item.getSourceAsString(), T.class);
            } catch (JsonProcessingException e) {
                log.error("", e);
            } catch (IOException e) {
                e.printStackTrace();
            }
            data.add(entity);
        });
        return data;
    }

3.分组统计查询

List<T> list = new ArrayList<>();
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("inf_group_statistic").field("inf_group").size(10000);
SearchRequestBuilder aggregationBuilders = transportClient.prepareSearch(SearchModuleEnum.IMFORMATION_MODULE.index().split(","))
    .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("show", "true")))
    .addAggregation(aggregationBuilder)
    .setFetchSource(false);
SearchResponse searchResponse1 = aggregationBuilders.get();
Terms terms = searchResponse1.getAggregations().get("inf_group_statistic");
List<Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
    String key = bucket.getKeyAsString();
    long docCount = bucket.getDocCount();
    T res = new T();
    res.setIndustry(key);
    res.setNum((int) docCount);
    list.add(res);
}

其最终形成的查询和下面的形式差不多

POST {{es-host}}/test_index/data/_search

{
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "show": true
                    }
                },
                {
                    "exists": {
                        "field": "event_classification"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "group_by_tags": {
            "terms": {
                "size": 1000,
                "field": "event_classification"
            }
        }
    }
}

4.统计总数量和总金额

SearchRequestBuilder aggregationBuilders = transportClient.prepareSearch(
    EsIndexEnum.POLICY_HALL.indexName())
    .setQuery(mixQuery)
    .addAggregation(AggregationBuilders.sum("matchMoney").field("money"))
    .setFetchSource(false);
SearchResponse searchResponse1 = aggregationBuilders.get();
Sum matchMoney = searchResponse1.getAggregations().get("matchMoney");
result.setMatchMoney(BigDecimal.valueOf(matchMoney.getValue()).setScale(2,BigDecimal.ROUND_HALF_UP).toPlainString());
result.setMatchNum((int) hits.getTotalHits());

5.高亮显示


SearchResponse searchResponse = searchRequestBuilder
                .execute()
                .actionGet();
        SearchHits hits = searchResponse.getHits();HighlightBuilder highlightBuilder = new HighlightBuilder()
                .preTags("<span class=\"searchHighlight\">")
                .postTags("</span>")
                .field("title")
                .field("second_title")
                .field("content");

SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch(
    SearchModuleEnum.get(req.getModuleType()).index().split(","))
    .setQuery(mixQuery)
    .highlighter(highlightBuilder)
    .setFrom((req.getPageNum() - 1) * req.getPageSize())
    .setSize(req.getPageSize())
    .setFetchSource(true);

SearchResponse searchResponse = searchRequestBuilder
                .execute()
                .actionGet();
SearchHits hits = searchResponse.getHits();

hits.forEach(item -> {
    String index = item.getIndex();
    String sourceAsString = item.getSourceAsString();
    Map<String, HighlightField> highlightFieldMap = item.highlightFields();

    HighlightField titleHighlight = highlightFieldMap.get("title");
    StringBuilder title = new StringBuilder();
    if (Objects.nonNull(titleHighlight)) {
        for (Text fragment : titleHighlight.fragments()) {
            title.append(fragment);
        }
    }
    
    StringBuilder secondTitle = new StringBuilder();
    HighlightField secondTitleHighlight = highlightFieldMap.get("second_title");
    if (Objects.nonNull(secondTitleHighlight)) {
        for (Text fragment : secondTitleHighlight.fragments()) {
            secondTitle.append(fragment);
        }
    }

    StringBuilder content = new StringBuilder();
    HighlightField contentHighlight = highlightFieldMap.get("content");
    if (Objects.nonNull(contentHighlight)) {
        for (Text fragment : contentHighlight.fragments()) {
            content.append(fragment);
        }
    }
});

6.高权重查询

比如titel查询的权重是content的两倍

MultiMatchQueryBuilder query = QueryBuilders
    .multiMatchQuery(req.getSearchKey())
    .field("title", 2)
    .field("second_title", 1.2f)
    .field("content", AbstractQueryBuilder.DEFAULT_BOOST);
mixQuery.must(query);

7.随机查询

有的时候想要任意返回几条数据,可以使用随机查询的方式

ScriptSortBuilder scriptSortBuilder = SortBuilders.scriptSort(new Script("Math.random()"), ScriptSortBuilder.ScriptSortType.NUMBER);

#es #elasticsearch
目录