yml
spring: data: elasticsearch: client: reactive: endpoints: 192.168.209.160:9200 connection-timeout: 10000#链接到es的超时时间,毫秒为单位,默认10秒(10000毫秒) socket-timeout: 10000#读取和写入的超时时间,单位为毫秒,默认5秒(5000毫秒) elasticsearch: rest: uris: 192.168.209.160:9200# 这两个属性在新版本的springboot中已经不建议使用,9300属于elasticsearch各节点之间的通讯接口。 # 属于lowlevelclient。我们推荐使用9200的RestHighLevelClient去链接 # cluster-nodes: 127.0.0.1:9300# cluster-name: helloElasticsearch
pom
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-captcha</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
Controller
package com.fwz.tproject.testfunction.controller; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.fwz.tproject.testfunction.service.ElasticSearchUtils; import com.fwz.tproject.testfunction.service.IdGeneratorSnowflake; import com.fwz.tproject.testfunction.service.OrderService;/** * * * @author 冯文哲 * @version 2018-06-11 */@RestController @RequestMapping(value = "/test")public class MainController { @Autowired private IdGeneratorSnowflake idGenerator; @Autowired ElasticSearchUtils utilsService; @RequestMapping(value = "createIndex") public String elasticsearch() { if (utilsService.createIndex("fwztest_index", 5, 1, "")) { return "创建成功"; } else { return "创建失败"; } } @RequestMapping(value = "addDoc") public String addDoc() { for (int j = 0; j < 1000; j++) { Map<String, Object> map = new ConcurrentHashMap<String, Object>(); map.put("author_id", idGenerator.snowflakeId()); map.put("title", "这有" + j + "个中国人"); map.put("content", "其中有" + (j - 1) + "个老黑"); map.put("create_date", new Date()); utilsService.addDoc("fwztest_index", String.valueOf(idGenerator.snowflakeId()), map); } return "新增成功"; } @RequestMapping(value = "deleteDoc") public String deleteDoc(String id) { utilsService.deleteDoc("fwztest_index", id); return "删除成功"; } @RequestMapping(value = "updateDoc") public String updateDoc(String id) { utilsService.updateDoc("fwztest_index", id, ""); return "修改成功"; } @RequestMapping(value = "selectDoc") public Map<String, Object> selectDoc(String id) { return utilsService.getDoc("fwztest_index", id); } }
Utils
package com.fwz.tproject.testfunction.service; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.Avg; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Service; @Service @EnableAsyncpublic class ElasticSearchUtils { @Autowired private RestHighLevelClient restClient; Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class.getName()); /** * createIndex * * @param indexName //索引名称 * @param shards //主分片 * @param replicas //备份分片 * @param mapping //mapping配置 * @return */ public boolean createIndex(String indexName, Integer shards, Integer replicas, String mapping) { logger.info(restClient.toString()); CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1)); request.mapping( "{\"properties\":{\"author_id\":{\"type\":\"long\"},\"title\":{\"type\":\"text\",\"analyzer\":\"standard\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"content\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"create_date\":{\"type\":\"date\"}}}", XContentType.JSON); request.setTimeout(TimeValue.timeValueMinutes(1)); CreateIndexResponse createIndexResponse; try { createIndexResponse = restClient.indices().create(request, RequestOptions.DEFAULT); boolean acknowledged = createIndexResponse.isAcknowledged(); logger.info("是否获取ACK:" + acknowledged); return acknowledged; } catch (IOException e) { // TODO Auto-generated catch block logger.error(e.toString()); } return false; } /** * * addDocument * * @param index 索引名称 * @param id 数据ID(为空则使用es内部ID) * @param source 数据(json 或 Map) * @return * @author fwzz * @version 创建时间:2021年1月27日 下午5:10:42 * */ @Async public Future<Boolean> addDoc(String index, String id, Map<String, Object> source) { // 增, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象 IndexRequest indexRequest = new IndexRequest(index).source(source); if (id != null && !"".equals(id)) { indexRequest = indexRequest.id(id); } try { IndexResponse res = restClient.index(indexRequest, RequestOptions.DEFAULT); logger.info("新增数据成功,ID为: " + res.getId()); return new AsyncResult<Boolean>(true); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return new AsyncResult<Boolean>(false); } /** * * deleteDocument * * @param index 索引名称 * @param id 数据ID * @return * @author fwzz * @version 创建时间:2021年1月27日 下午5:19:26 * */ public boolean deleteDoc(String index, String id) { // 删 DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse res; try { res = restClient.delete(deleteRequest, RequestOptions.DEFAULT); logger.info(res.getResult().toString()); logger.info("删除数据成功,ID为: " + res.getId()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } /** * * updateDocument * * @param index * @param id * @param source * @return * @author fwzz * @version 创建时间:2021年1月27日 下午5:25:43 * */ public boolean updateDoc(String index, String id, String source) { // 改, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象 UpdateRequest updateRequest = new UpdateRequest(index, id).doc(source); try { UpdateResponse res = restClient.update(updateRequest, RequestOptions.DEFAULT); logger.info("修改数据成功,ID为: " + res.getId()); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } /** * selectDocument * * @param index * @param id * @return * @author fwzz * @version 创建时间:2021年1月27日 下午5:27:33 * */ public Map<String, Object> getDoc(String index, String id) { // 查 GetRequest getRequest = new GetRequest(index, id); try { GetResponse res = restClient.get(getRequest, RequestOptions.DEFAULT); logger.info("查询数据成功,ID为: " + res.getId()); logger.info("查询数据成功,字符串数据为: " + res.getSourceAsString()); Map<String, Object> map = res.getSourceAsMap(); logger.info("查询数据成功,Map数据为: " + map.toString()); return map; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } /** * bulkDemo * * @param index * @param id * @return * @author fwzz * @version 创建时间:2021年1月27日 下午7:35:34 * */ public Boolean bulkRequest(String index, String id) { BulkRequest request = new BulkRequest(); /** * map为更新或新增的数据 */ request.add(new IndexRequest(index).source(XContentType.JSON, new HashMap<String, Object>())); request.add(new DeleteRequest(index, id)); request.add(new UpdateRequest(index, id).doc(XContentType.JSON, new HashMap<String, Object>())); BulkResponse bulkResponse; try { bulkResponse = restClient.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); logger.info(failure.getMessage()); continue; } DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; logger.info(indexResponse.getResult().toString()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; logger.info(updateResponse.getResult().toString()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; logger.info(deleteResponse.getResult().toString()); } } return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } /** * searchQueryDemo 可完全取代getRequest * * @return * @author fwzz * @version 创建时间:2021年1月27日 下午7:43:57 * */ public Boolean searchQuery() { /** * 指定index */ SearchRequest searchRequest = new SearchRequest("gdp_tops*"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); /** * 指定query */ sourceBuilder.query(QueryBuilders.termQuery("city", "北京市")); sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); searchRequest.source(sourceBuilder); try { SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT); Arrays.stream(response.getHits().getHits()).forEach(i -> { System.out.println(i.getIndex()); System.out.println(i.getSourceAsMap()); }); logger.info(response.getHits().getTotalHits().toString()); return true; } catch (IOException e) { e.printStackTrace(); } return false; } /** * aggsSearchDemo * * @return * @author fwzz * @version 创建时间:2021年1月27日 下午7:46:31 * */ public Boolean aggsQuery() { /** * 指定index */ SearchRequest searchRequest = new SearchRequest("gdp_tops*"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword"); aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age")); searchSourceBuilder.aggregation(aggregation); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); /** * 分页查询 */ /* * searchSourceBuilder.from(0); searchSourceBuilder.size(5); */ searchRequest.source(searchSourceBuilder); try { /** * 处理方法1 (1 2 都尝试一下) */ SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT); Arrays.stream(response.getHits().getHits()).forEach(i -> { logger.info(i.getIndex()); logger.info(i.getSourceAsMap().toString()); }); /** * 处理方法2 (1 2 都尝试一下) */ Aggregations aggregations = response.getAggregations(); Terms byCompanyAggregation = aggregations.get("by_company"); Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic"); Avg averageAge = elasticBucket.getAggregations().get("average_age"); double avg = averageAge.getValue(); logger.info(response.getHits().getTotalHits().toString()); return true; } catch (IOException e) { e.printStackTrace(); } return false; } /** * searchAsyncDemo * * @return * @author fwzz * @version 创建时间:2021年1月27日 下午7:50:00 * */ public Boolean searchAsync() { /** * 指定index */ SearchRequest searchRequest = new SearchRequest("gdp_tops*"); restClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { SearchHit[] searchHits = searchResponse.getHits().getHits(); for (SearchHit hit : searchHits) { // 结果的Index String index = hit.getIndex(); // 结果的ID String id = hit.getId(); // 结果的评分 float score = hit.getScore(); // 查询的结果 JSON字符串形式 String sourceAsString = hit.getSourceAsString(); // 查询的结果 Map的形式 Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // Document的title String documentTitle = (String) sourceAsMap.get("title"); // 结果中的某个List List<Object> users = (List<Object>) sourceAsMap.get("user"); // 结果中的某个Map Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject"); } } @Override public void onFailure(Exception e) { logger.error(e.toString()); } }); return true; } /** * 有时候需要查询的数据太多,可以考虑使用SearchRequest.scroll()方法拿到scrollId;之后再使用SearchScrollRequest * 其用法如下: * * @return * @author fwzz * @version 创建时间:2021年1月27日 下午8:00:14 * */ public Boolean searchScroll() { SearchRequest searchRequest = new SearchRequest("posts"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("city", "北京市")); searchSourceBuilder.size(5); searchRequest.source(searchSourceBuilder); searchRequest.scroll(TimeValue.timeValueMinutes(1L)); SearchResponse searchResponse; try { searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT); String scrollId = searchResponse.getScrollId(); SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(TimeValue.timeValueSeconds(30)); SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = searchScrollResponse.getScrollId(); SearchHits hits = searchScrollResponse.getHits(); logger.info(hits.getTotalHits().toString()); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } }
全局ID生成工具类
package com.fwz.tproject.testfunction.service; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.net.NetUtil; import cn.hutool.core.util.IdUtil; @Componentpublic class IdGeneratorSnowflake { private long workerId = 0; private long datacenterId = 1; private Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId); private static final Logger log = LoggerFactory.getLogger(IdGeneratorSnowflake.class.getName()); // 依赖注入完成后执行该方法,进行一些初始化工作 @PostConstruct public void init() { try { workerId = NetUtil.ipv4ToLong(NetUtil.getLocalhostStr()); log.info("当前机器的workerId: {}", workerId); } catch (Exception e) { e.printStackTrace(); log.warn("当前机器的workerId获取失败", e); // 释放ID workerId = NetUtil.getLocalhostStr().hashCode(); } } // 使用默认机房号获取ID public synchronized long snowflakeId() { return snowflake.nextId(); } // 自己制定机房号获取ID public synchronized long snowflakeId(long workerId, long datacenterId) { Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId); return snowflake.nextId(); } /** * 生成的是不带-的字符审,类似于: 73a64edf935d4952a287739a66f96e06 * * @return */ public String simpleUUID() { return IdUtil.simpleUUID(); } /** * 生成的UUID是带-的字符串,类似于: b12b6401-6f9c-4351-b2b6-d8afc9ab9272 * * @return */ public String randomUUID() { return IdUtil.randomUUID(); } public static void main(String[] args) { IdGeneratorSnowflake f = new IdGeneratorSnowflake(); for (int i = 0; i < 1000; i++) { System.out.println(f.snowflakeId(0, 0)); } } }
转自:https://www.cnblogs.com/fengwenzhee/p/14336734.html