Elasticsearch的应用与入门

bigbang 1年前 ⋅ 1439 阅读
ad

前言

Elasticsearch是Elastic公司开源的分布式搜索引擎,基于 JSON 开发而来,具有 RESTful 风格。

通过 Elasticsearch,您能够执行及合并多种类型的搜索(结构化数据、非结构化数据、地理位置、指标),搜索方式随心而变,通过扩展集群规模,能够实现PB级数据的存储与搜索

Elasticsearch底层基于Lucene实现,其实现了方便易用的上层API,无须借助分布式中间件实现集群管理,十分灵活。Elastic公司本身就是围绕这一开源技术而成立的。Elastic推出的产品矩阵叫做Elastic Stack。最新架构图是这样的:

ELK的含义就是:Elasticsearch+Logstash+Kibana。其中Kibana是其可视化组件,Logstash是其日志采集管道。至于Beats的作用笔者还来得及研究。

 

用途

通过前言我们大致知道了Elasticsearch是什么。那么我们能将其应用在哪些场景呢?

  • ELK最典型的应用场景是企业内日志的存储与搜索分析
  • 分布式搜索引擎:如电商的商品搜索,企业内部站内搜索
  • 数据分析:支持数据的metrics, patterns, trends,aggregation 分析,使用机器学习进行自动化时间时序数据的分析

 

如何学习

本篇文章将通过介绍官网的Elasticsearch Guide来了解和学习elasticsearch。通常官方文档更全面且更权威。不过elasticsearch提供的文档更像操作手册,其中对一些架构和概念并没有详细说明,读者可参考Elastic:开发者上手指南学习前几个步骤,由于博客使用的是之前的版本,后面的就不再建议跟着操作了。

当前(2022-04-20)的最新版本为8.1.2,重点入门目录罗列如下:

 

客户端介绍 点我进入

Elasticsearch支持多客户端。如:Java,JavaScript, Go 等。又由于其基于 JSON 开发而来,具有 RESTful 风格。你可以使用HTTP客户端直接访问,Postman Curl,Kibana的debug里更是支持语法提示补全功能。

如您是Java后端开发且项目中使用springboot,可以引入starter 

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>${spring-boot.version}</version>
</dependency>

或直接使用客户端

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.1.2</version>
</dependency>

由于elasticsearch官网客户端是和elasticsearch版本一致的,而springboot支持的版本滞后于官方,所以当引入jar包请注意依赖包版本问题。如jakarta.json-api,springboot的版本是1.1.6,而elasticsearch-java依赖的版本是2.0.1。则jakarta.json-api的版本会被父包版本覆盖从而导致找不到类 jakarta.json.spi.JsonProvider

<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
<scope>compile</scope>
</dependency>

具体做法是这样

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.1.3</version>
<exclusions>
<exclusion>
<artifactId>jakarta.json-api</artifactId>
<groupId>jakarta.json</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>jakarta.json-api</artifactId>
<groupId>jakarta.json</groupId>
<version>2.0.1</version>
</dependency>

 

Java客户端是基于建造者模式来实现的,可全程使用lamda表达式。javaAPI 与REST APIs  是对应关系,lamda表达式可读性较差,不熟悉的话,比如索引创建,你可以先写出创建索引的Json格式,然后对应着写代码。下面奉献一段(其中包含向量索引的kNN搜索)


import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.TotalHits;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.util.ObjectBuilder;
import com.blueorigin.universe.elasticsearch.model.VectorModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

@Slf4j
public class VectorServiceTest {
ElasticsearchClient esClient;
String indexName = "index001";
ObjectMapper objectMapper = new ObjectMapper();
private String index2 = "index002";

{
// Create the low-level client
RestClient restClient = RestClient.builder(
new HttpHost("192.168.2.222", 9200),
new HttpHost("192.168.2.221", 9201)
).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
esClient = new ElasticsearchClient(transport);
}

@Test
public void search() throws IOException {
SearchResponse<VectorModel> response = esClient.search(
s -> s.index(indexName).query(q -> q.match(t -> t.field("vectorId").query(1000))),
VectorModel.class);

TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;

if (isExactResult) {
log.info("There are " + total.value() + " results");
} else {
log.info("There are more than " + total.value() + " results");
}

List<Hit<VectorModel>> hits = response.hits().hits();
for (Hit<VectorModel> hit : hits) {
VectorModel product = hit.source();
log.info(objectMapper.writeValueAsString(product));
}
}

@Test
public void knnSearch() throws IOException {

Double[] qv = new Double[]{0.04881, -0.33609, -0.13305, 0.11145, -0.08408, -0.03294, -0.40138, 0.19321, -0.02528, -0.02768, -0.07773, -0.23604, 0.12693, -0.07832, 0.03975, -0.23248, -0.04273, 0.10179, 0.15156, -6.8E-4, 0.07414, 0.02857, 0.19306, -0.06342, -0.15033, 0.0331, 0.01185, 0.08438, -0.04065, -0.10001, 0.09478, 0.07242, 0.07154, -0.11093, -0.0125, -0.10403, -0.07861, -0.16341, -0.17495, 0.10467, -0.08135, -0.18041, 0.23096, 0.16601, 0.00806, 0.06222, -0.11909, -0.0224, -0.00351, 0.00517, -0.10414, 0.07032, 0.17009, 0.13883, -0.12384, -0.13548, -0.01265, -0.01102, -0.02224, -0.10055, 0.11512, 0.04149, -0.0423, -0.01998};
List<Double> queryVector = Arrays.asList(qv);
Function<KnnSearchRequest.Builder, ObjectBuilder<KnnSearchRequest>> knnRequest =
builder -> builder.index(indexName).knn(s -> s.field("vector").queryVector(queryVector).k(10).numCandidates(10L));

KnnSearchResponse<VectorModel> response = esClient.knnSearch(knnRequest, VectorModel.class);

TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;

if (isExactResult) {
log.info("There are " + total.value() + " results");
} else {
log.info("There are more than " + total.value() + " results");
}

List<Hit<VectorModel>> hits = response.hits().hits();
log.info("query = 0.9999994, vector = {}", queryVector);
for (Hit<VectorModel> hit : hits) {
VectorModel source = hit.source();
log.info("score = {}, vector = {}", hit.score(), source.getVector());
}

}

@Test
public void batchKnnSearch() throws IOException {

File file = new File("vector.txt");
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
List<VectorModel> vectorModelList = Lists.newArrayListWithExpectedSize(2000);

int count = 0;
int skip = 0;
while (true) {
String line = bufferedReader.readLine();
skip++;
if (skip % 100 != 0) {
continue;
}
if (StringUtils.isBlank(line)) {
log.info("break end");
break;
}
VectorModel vectorModel = objectMapper.readValue(line, VectorModel.class);
vectorModelList.add(vectorModel);
count++;
if (count > 2000) {
log.info("break 2000");
break;
}
}

long start = System.currentTimeMillis();
for (VectorModel vectorModel : vectorModelList) {
KnnSearchResponse<VectorModel> response = esClient.knnSearch(builder -> builder.index(indexName)
.knn(s -> s.field("vector").queryVector(vectorModel.getVector()).k(10).numCandidates(10L)), VectorModel.class);
}
long end = System.currentTimeMillis();

log.info("1 thread query 2000 cost {} ms", end - start);
}

void batchInsert(List<VectorModel> modelList) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (VectorModel vec : modelList) {
br.operations(op -> op
.index(idx -> idx
.index(indexName)
.document(vec)
)
);
}
BulkResponse result = esClient.bulk(br.build());
// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
}
}

@Test
public void checkIndexExists() throws IOException {
BooleanResponse exists = esClient.indices().exists(eb -> eb.index(indexName));
if (exists.value()) {
log.info("index {} exists", indexName);
} else {
log.info("index {} not exists", indexName);
}
}

@Test
public void insert100() {
try {
insertFromFile(index2, 100);
} catch (Exception e) {
e.printStackTrace();
}
}

// @Test
public void insertFromFile(String index, int num) throws Exception {
// 索引存在就不执行了
if (existsIndex(index)) return;

// 创建索引
createIndex(index);

// 准备数据
File file = new File("vector.txt");
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
List<VectorModel> vectorModelList = Lists.newArrayListWithExpectedSize(10000);

// 执行插入
int total = 0;
int count = 0;
while (true) {
String line = bufferedReader.readLine();
if (StringUtils.isBlank(line)) {
total += vectorModelList.size();
batchInsert(vectorModelList);
break;
}
count++;
VectorModel vectorModel = objectMapper.readValue(line, VectorModel.class);
vectorModelList.add(vectorModel);
if (count % 10000 == 0) {
total += vectorModelList.size();
batchInsert(vectorModelList);
vectorModelList = Lists.newArrayListWithExpectedSize(10000);
}
if (total > num) break;
}
log.info("vector total = {}", total);
}

private boolean existsIndex(String index) throws IOException {
BooleanResponse exists = esClient.indices().exists(eb -> eb.index(index));
if (exists.value()) {
log.info("index({}) already exists", index);
return true;
}
return false;
}


private void createIndex(String index) throws IOException {

CreateIndexRequest.Builder indexBuilder = new CreateIndexRequest.Builder();
indexBuilder.index(index);
TypeMapping.Builder tmBuilder = new TypeMapping.Builder();
tmBuilder.properties("vec", new Property.Builder().denseVector(builder -> builder.index(true).dims(64).similarity("dot_product")
.indexOptions(opBuilder -> opBuilder.type("hnsw").m(16).efConstruction(100))).build());
tmBuilder.properties("id", new Property.Builder().long_(pb -> pb.index(false)).build());
TypeMapping typeMapping = tmBuilder.build();
indexBuilder.mappings(typeMapping);
CreateIndexResponse createIndexResponse = esClient.indices().create(indexBuilder.build());

String resIndex = createIndexResponse.index();
Boolean acknowledged = createIndexResponse.acknowledged();
boolean b = createIndexResponse.shardsAcknowledged();
log.info("index create response for {}, acknowledged= {}, shardsAcknowledged= {}", resIndex, acknowledged, b);
}
}

 

关于Webfunny

Webfunny专注于前端监控系统,前端埋点系统的研发。 致力于帮助开发者快速定位问题,帮助企业用数据驱动业务,实现业务数据的快速增长。支持H5/Web/PC前端、微信小程序、支付宝小程序、UniApp和Taro等跨平台框架。实时监控前端网页、前端数据分析、错误统计分析监控和BUG预警,第一时间报警,快速修复BUG!支持私有化部署,Docker容器化部署,可支持千万级PV的日活量!

  点赞 0   收藏 0
  • bigbang
    共发布1篇文章 获得0个收藏
全部评论: 0