使用Java客户端

前面章节中我们主要使用Kibana的Dev Tools调用Elasticsearch的Restful API执行创建索引、插入数据、检索等操作,但在实际开发中,尤其是对于数据的增删改查,我们不会直接手动用HTTP客户端去调用Restful API,这样实在太麻烦了。ES官方维护了Java/Python/NodeJS/C#/PHP/Go/Ruby/Rust(实验性)的客户端,我们这里以使用最多的Java客户端为例介绍相关用法。

ES的Java客户端版本

ES的Java客户端非常混乱,其大致发展历程如下:

Transport Client:最早的ES官方Java客户端,使用自定义Transport协议(非HTTP)通信,从ES7.x开始被废弃。

Low Level REST Client(LLRC):官方提供的基于HTTP的ES客户端,是ES的Restful API的直接封装,使用起来非常繁琐,一般用户不直接使用LLRC。

High Level REST Client(HLRC):是LLRC的高级封装,提供Java方法级的API,从ES 7.15开始被废弃。

Java API Client:最新官方推荐客户端,支持ES 8.x版本,同样基于LLRC,可以看作HLRC的重写和替代版本。

本系列笔记都是基于ES8.x进行介绍的,因此我们这里选择Java API Client,如果你使用的是ES7.x,那么仍需要使用High Level REST Client,Java API Client类似于HLRC的一个完全重写版本,二者之间的API差别十分巨大不能通用,如果你用的还是老版本ES那么本篇笔记的内容并不适用。

此外,如果你使用的是SpringBoot可能需要考虑使用SpringData Elasticsearch,有关这部分内容可以参考SpringData相关章节,这篇笔记只介绍ES的原生Java客户端,不涉及SpringData Elasticsearch相关内容。

引入Maven依赖

这里我们引入ES的Java API Client依赖,依赖的版本号与我们使用的ES服务端版本相同。

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.13.4</version>
</dependency>

此外,实际使用ES的Java API Client时,底层还需要提供JSON序列化库,我们这里使用Jackson。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.21.3</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.21</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.21.3</version>
</dependency>

创建ES客户端

Java API Client中,ES的客户端被抽象为了ElasticsearchClient类,我们在ES上执行创建索引、插入数据、检索等操作都需要它的实例,这个客户端是线程安全的,我们全局创建一个就行了,但需要记得在程序关闭时将客户端也关闭。不过构造ElasticsearchClient还是略有点复杂的,我们将其抽象为了工厂类EsClientFactory,下面是创建客户端的相关代码。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.io.IOException;

public class EsClientFactory {
    private static volatile ElasticsearchClient client;
    private static volatile RestClientTransport transport;

    private EsClientFactory() {
    }

    public static ElasticsearchClient getClient() {
        if (client == null) {
            synchronized (EsClientFactory.class) {
                if (client == null) {
                    // 构建RestClient,如果是集群模式HttpHost可以传多个或使用数组
                    RestClient restClient = RestClient.builder(
                            new HttpHost("localhost", 9200)
                    ).build();
                    // 构建ElasticsearchTransport
                    transport = new RestClientTransport(
                            restClient, new JacksonJsonpMapper()
                    );
                    // 构建ElasticsearchClient
                    client = new ElasticsearchClient(transport);
                }
            }
        }
        return client;
    }

    public static synchronized void closeClient() {
        if (transport != null) {
            try {
                transport.close();
                client = null;
                transport = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

代码中,创建ElasticsearchClient其实分为三层,首先我们创建了RestClient,它其实来自Low Level REST Client(LLRC),前面我们说过ES的Java API Client也是基于LLRC的,在这里就体现了这一层关系,随后我们构造了ElasticsearchTransport,其中指定使用Jackson处理JSON序列化,最后我们实例化了ElasticsearchClient。为了避免getClient()方法被并发调用时出现重复创建客户端的情况,我们使用了同步块和双重检查锁(DCL)处理并发逻辑。

最后,我们还提供了一个关闭客户端的closeClient()方法,但要注意根据官方文档这里需要被关闭的是RestClientTransportElasticsearchClient并不提供close()方法,RestClientTransport会自动关闭底层的HTTP客户端,RestClientclose()就不必调用了,虽然看起来有些奇葩,但确实应该这么写。另外我们还看到代码中有一个包名叫co.elastic.clients.transport.rest_client.RestClientTransport,这不是我写错了,ES中这个类的包就是使用了奇怪的snake_case风格。ES的JavaSDK确实充满了某种诡异和混沌感,但它就是这么设计的。

认证配置

如果ES开启了安全认证,创建RestClient时需要添加配置,以最常见的Basic Auth为例,相关配置代码如下。

BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
        AuthScope.ANY,
        new UsernamePasswordCredentials("elastic", "your-password")
);
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "https"))
        .setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder
                    .setDefaultCredentialsProvider(credentialsProvider)
                    // 临时跳过自签名证书校验(生成环境建议导入ES的CA证书,而非信任所有证书)
                    .setSSLContext(SSLContexts.custom().loadTrustMaterial((chain, authType) -> true).build())
                    .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
        )
        .build();

ES其实还支持很多种高级认证方式,不同的认证方式配置代码也是不同的,有关其它认证方式具体如何编写可以参考官方文档。

连接池与超时配置

RestClient底层使用的其实是旧版本Apache HttpClient 4,RestClient的Builder支持RequestConfigCallbackHttpClientConfigCallback来自定义超时和连接池参数。

RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)
        ).setRequestConfigCallback(requestConfigBuilder ->
                requestConfigBuilder
                        // 连接超时,默认1秒
                        .setConnectTimeout(5000)
                        // 响应超时,默认30秒
                        .setSocketTimeout(60000)
        )
        .setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder
                        // 整个连接池允许的最大总连接数,默认30
                        .setMaxConnTotal(30)
                        // 每个路由(集群模式下每个ES节点)的最大连接数,默认10
                        .setMaxConnPerRoute(10)
        ).build();

生产环境中这几个参数需要根据实际业务进行调整,不要直接使用默认值。

注意:我这里使用的Java API Client版本是8.13.4,它的底层是基于Apache HttpClient 4的,但按目前趋势这个客户端库早晚都要替换到Apache HttpClient 5,因此这里的配置方式未来可能再次发生破坏性变更,具体开发时应根据我们实际使用的Java API Client版本查阅相关文档信息。

实体类与序列化

在Java API Client中,文档与Java对象之间的映射由Jackson自动完成,因此这需要我们对数据模型类添加一些Jackson注解。我们这里定义一个与前面章节一直使用的products索引对应的实体类。

package com.gacfox.demo.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.util.Date;
import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Product {
    private String id;
    private String name;
    private String description;
    private String category;
    private String brand;
    private Double price;
    private Integer stock;
    private List<String> tags;

    @JsonProperty("is_deleted")
    private Boolean isDeleted;

    @JsonProperty("created_at")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date createdAt;
}

这里有几个细节需要注意:

@JsonIgnoreProperties(ignoreUnknown = true):ES返回的文档可能包含一些Java实体类中没有定义的字段(如 _score),加上这个注解可以避免反序列化报错。

@JsonProperty:ES中字段名使用小写下划线命名风格,Java实体类使用驼峰命名风格,因此需要通过这个注解做映射。

@JsonFormat:日期字段需要指定格式,它要与Mapping中的format保持一致。

此外,这里的id字段实际我们想让它对应到ES文档的_id上,但它不在文档的_source中,因此需要单独手动处理,我们会在后文详细说明。

索引管理

Java API Client提供了完整的索引管理API,不过实际开发中我们可能未必通过代码管理索引,很多团队也采用手动调用Resultful API管理索引,只用代码对数据进行增删改查的模式。两种操作方式其实是等价的,学会其中一种另一种也自然能上手使用。

创建索引

下面例子代码中,我们使用Java代码创建一个名为products的索引,索引的具体定义和04-QueryDSL章节中使用的索引一致。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            CreateIndexResponse response = client.indices().create(c -> c
                    .index("products")
                    .mappings(m -> m
                            .properties("name", p -> p
                                    .text(t -> t
                                            .analyzer("ik_max_word")
                                            .searchAnalyzer("ik_smart")
                                            .fields("keyword", f -> f.keyword(k -> k))
                                    )
                            )
                            .properties("description", p -> p
                                    .text(t -> t
                                            .analyzer("ik_max_word")
                                            .searchAnalyzer("ik_smart")
                                    )
                            )
                            .properties("category", p -> p.keyword(k -> k))
                            .properties("brand", p -> p.keyword(k -> k))
                            .properties("price", p -> p.double_(d -> d))
                            .properties("stock", p -> p.integer(i -> i))
                            .properties("tags", p -> p.keyword(k -> k))
                            .properties("is_deleted", p -> p.boolean_(b -> b))
                            .properties("created_at", p -> p
                                    .date(d -> d.format("yyyy-MM-dd HH:mm:ss"))
                            )
                            .properties("location", p -> p.geoPoint(g -> g))
                    )
            );
            boolean result = response.acknowledged();
            System.out.println("创建索引结果: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

Java API Client设计的非常特别,它使用Lambda Builder模式构造请求,我们可以看到每个API方法都接收一个Lambda函数,Lambda的参数是Builder对象,我们可以链式调用设置其中的各项参数,这种API风格设计其实可读性非常差,不过由于Java本身是门强类型语言,Lambda Builder风格在这种情况下反而适合强类型让代码变得不容易写错,这可能也是开发团队做出的一点取舍,牺牲代码可读性换取类型安全的设计。

如果ES中索引已经存在了,它不会重复创建,而是会抛出ElasticsearchException异常。

判断索引是否存在

下面例子可以判断某个索引是否存在。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.transport.endpoints.BooleanResponse;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            BooleanResponse response = client.indices().exists(e -> e.index("products"));
            boolean result = response.value();
            System.out.println("products索引是否存在: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

删除索引

下面例子可以删除指定名字的索引,如果索引不存在将抛出ElasticsearchException异常。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            DeleteIndexResponse response = client.indices().delete(d -> d.index("products"));
            boolean result = response.acknowledged();
            System.out.println("删除索引结果: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

文档增删改查

新增文档

新增文档使用客户端的index()方法,新增时我们可以手动指定文档ID,也可以让ES自动生成,不过注意如果手动指定这个ID也必须是字符串类型。下面例子我们向索引中插入一条数据,ID是由ES自动生成的,它会在IndexResponse中被返回。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.gacfox.demo.model.Product;

import java.io.IOException;
import java.util.Arrays;
import java.util.Date;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            Product product = new Product();
            product.setName("华为 Mate 60 Pro");
            product.setDescription("华为旗舰智能手机,搭载麒麟芯片,支持卫星通话");
            product.setCategory("手机");
            product.setBrand("华为");
            product.setPrice(6999.00);
            product.setStock(100);
            product.setTags(Arrays.asList("旗舰", "5G", "鸿蒙"));
            product.setIsDeleted(false);
            product.setCreatedAt(new Date());

            IndexResponse response = client.index(i -> i
                    .index("products")
                    .document(product)
            );
            String id = response.id();
            Result result = response.result();
            System.out.println("文档ID: " + id + ", 结果: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

通过ID获取文档

下面例子代码中,我们通过文档的ID查询数据记录。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.GetResponse;
import com.gacfox.demo.model.Product;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            GetResponse<Product> response = client.get(g -> g
                            .index("products")
                            .id("hF5tT54BkpMXy7k3_sC0"),
                    Product.class
            );
            if (response.found()) {
                Product product = response.source();
                if (product != null) {
                    product.setId(response.id());
                }
                System.out.println("产品: " + product);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

前面我们说过,文档的ID在ES中对应的是_id字段,它是文档的元数据因此不在_source中,这也导致Java API Client的设计中ID不在source()的结果里,如果我们确实需要这个值,得单独调用response.id()将其取出。

更新文档

更新文档需要传入ID和更新字段,更新字段需要封装在Map类型中,下面例子代码演示了相关用法。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.UpdateResponse;
import com.gacfox.demo.model.Product;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            Map<String, Object> updateFields = new HashMap<>();
            updateFields.put("price", 6599.00);
            updateFields.put("stock", 80);

            UpdateResponse<Product> response = client.update(u -> u
                            .index("products")
                            .id("hF5tT54BkpMXy7k3_sC0")
                            .doc(updateFields),
                    Product.class
            );
            Result result = response.result();
            System.out.println("更新结果: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

删除文档

删除文档需要传入文档ID,下面例子演示了删除的写法。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.DeleteResponse;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            DeleteResponse response = client.delete(d -> d
                    .index("products")
                    .id("hF5tT54BkpMXy7k3_sC0")
            );
            Result result = response.result();
            System.out.println("删除结果: " + result);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

当然,我们也可以将数据设计为“软删除”的,例如更新一个is_deleted字段表达删除,这样对于需要操作审计和合规留存的场景更友好。

文档检索

下面例子演示了Match Query的写法,其实我们可以发现代码和JSON格式的QueryDSL是完全对应的,会写QueryDSL也就会组装检索代码。至于Term Query、Bool Query、分页、高亮等都是类似的,这里就不重复黏贴代码了。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.gacfox.demo.model.Product;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            SearchResponse<Product> response = client.search(s -> s
                            .index("products")
                            .query(q -> q
                                    .match(m -> m
                                            .field("name")
                                            .query("华为")
                                    )
                            ),
                    Product.class
            );
            List<Product> productList = extractHits(response);
            System.out.println("检索结果: " + productList);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }

    private static List<Product> extractHits(SearchResponse<Product> response) {
        List<Product> products = new ArrayList<>();
        for (Hit<Product> hit : response.hits().hits()) {
            Product product = hit.source();
            if (product != null) {
                product.setId(hit.id());
                products.add(product);
            }
        }
        return products;
    }
}

聚合统计

下面例子中,我们使用指标聚合统计了所有产品的数量、最低价、最高价和平均值信息。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.StatsAggregate;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import com.gacfox.demo.model.Product;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            SearchResponse<Product> response = client.search(s -> s
                            .index("products")
                            .size(0)
                            .aggregations("price_stats", a -> a
                                    .stats(st -> st.field("price"))
                            ),
                    Product.class
            );

            StatsAggregate stats = response.aggregations()
                    .get("price_stats")
                    .stats();

            System.out.println("商品数量: " + stats.count());
            System.out.println("最低价: " + stats.min());
            System.out.println("最高价: " + stats.max());
            System.out.println("平均价: " + stats.avg());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

下面例子代码中,我们使用桶聚合统计了各品牌的产品数量。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import com.gacfox.demo.model.Product;

import java.io.IOException;

public class Main {
    public static void main(String[] args) {
        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            SearchResponse<Product> response = client.search(s -> s
                            .index("products")
                            .size(0)  // 不需要返回文档,只要聚合结果
                            .aggregations("brand_count", a -> a
                                    .terms(t -> t
                                            .field("brand")
                                            .size(20)
                                    )
                            ),
                    Product.class
            );
            StringTermsAggregate brandAgg = response.aggregations()
                    .get("brand_count")
                    .sterms();
            for (StringTermsBucket bucket : brandAgg.buckets().array()) {
                System.out.println(bucket.key().stringValue() + " => " + bucket.docCount());
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

批处理

当我们需要批量将某种数据源中的数据写入ES时,使用client.index()逐条向ES写入速度非常慢,这种情况一定要使用Bulk批处理。之前的章节我们曾介绍过批处理相关的Restful API并手动在Kibana DevTools中调用,实际上直接用Java API Client做批处理写入是更常见的方式。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.gacfox.demo.model.Product;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        List<Product> products = ... // 假设这里我们从某种数据源读取了products列表

        ElasticsearchClient client = EsClientFactory.getClient();
        try {
            BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
            for (Product product : products) {
                String id = product.getId();
                bulkBuilder.operations(op -> op
                        .index(i -> i
                                .index("products")
                                .id(id)
                                .document(product)
                        )
                );
            }
            BulkResponse response = client.bulk(bulkBuilder.build());
            if (response.errors()) {
                for (BulkResponseItem item : response.items()) {
                    if (item.error() != null) {
                        System.err.println("批量写入失败,ID: " + item.id() + ", 原因: " + item.error().reason());
                    }
                }
            } else {
                System.out.println("批量写入成功,共 " + response.items().size() + " 条");
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            EsClientFactory.closeClient();
        }
    }
}

Bulk请求并不是越大越好,单次请求体过大会占用过多内存并导致超时,一般建议单次Bulk请求控制在500-1000条且5MB-15MB左右,超出这个范围需要在应用层自行分批处理。

异步操作

Java API Client中提供了一个异步客户端,大部分操作都有对应的异步版本,异步操作会返回CompletableFuture,下面是例子代码我们将之前创建客户端的工厂类替换为了创建异步客户端的版本,并检索了一些数据。

package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.io.IOException;

public class EsAsyncClientFactory {
    private static volatile ElasticsearchAsyncClient client;
    private static volatile RestClientTransport transport;

    private EsAsyncClientFactory() {
    }

    public static ElasticsearchAsyncClient getClient() {
        if (client == null) {
            synchronized (EsAsyncClientFactory.class) {
                if (client == null) {
                    // 构建RestClient,如果是集群模式HttpHost可以传多个或使用数组
                    RestClient restClient = RestClient.builder(
                            new HttpHost("localhost", 9200)
                    ).build();
                    // 构建ElasticsearchTransport
                    transport = new RestClientTransport(
                            restClient, new JacksonJsonpMapper()
                    );
                    // 构建ElasticsearchAsyncClient
                    client = new ElasticsearchAsyncClient(transport);
                }
            }
        }
        return client;
    }

    public static synchronized void closeClient() {
        if (transport != null) {
            try {
                transport.close();
                client = null;
                transport = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
package com.gacfox.demo;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.gacfox.demo.model.Product;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) {
        // 这里用CountDownLatch阻塞等待一下,避免主线程瞬间退出,接收不到结果
        CountDownLatch latch = new CountDownLatch(1);
        ElasticsearchAsyncClient client = EsAsyncClientFactory.getClient();
        CompletableFuture<SearchResponse<Product>> future = client.search(s -> s
                        .index("products")
                        .query(q -> q
                                .match(m -> m.field("name").query("华为"))
                        ),
                Product.class
        );
        future.whenComplete((response, exception) -> {
            if (exception != null) {
                System.err.println("查询失败: " + exception.getMessage());
            } else {
                List<Product> productList = extractHits(response);
                System.out.println("检索结果: " + productList);
            }

            EsAsyncClientFactory.closeClient();
            latch.countDown();
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<Product> extractHits(SearchResponse<Product> response) {
        List<Product> products = new ArrayList<>();
        for (Hit<Product> hit : response.hits().hits()) {
            Product product = hit.source();
            if (product != null) {
                product.setId(hit.id());
                products.add(product);
            }
        }
        return products;
    }
}
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。