记一次最全的Java+Elasticsearch数据保存与查询操作 您所在的位置:网站首页 java存储数据的方法有哪些 记一次最全的Java+Elasticsearch数据保存与查询操作

记一次最全的Java+Elasticsearch数据保存与查询操作

2024-06-26 07:17| 来源: 网络整理| 查看: 265

前言

      最近公司有一个需求,就是要将网关(Java开发)的日志写入Elasticsearch中,并进行日志查询与展示,由于楼主之前没有接触过Elasticsearch,更谈不上Java Api的使用了,于是苦思良久,各种查阅资料,最后功夫不负有心人,终于完成了功能。现将过程及方法论分享如下,希望可以帮到他人。

一、相关Jar包的引入

      Java使用Elasticsearch 所需要的Jar包依赖如下:

org.elasticsearch.client transport 6.5.4 org.elasticsearch elasticsearch 6.5.4 org.elasticsearch elasticsearch-core 6.5.4 org.elasticsearch elasticsearch-secure-sm 6.5.4 org.elasticsearch elasticsearch-x-content 6.5.4 org.elasticsearch elasticsearch-cli 6.5.4 org.elasticsearch jna 4.5.1 org.elasticsearch.plugin transport-netty4-client 6.5.4 org.elasticsearch.plugin reindex-client 6.5.4 org.elasticsearch.client elasticsearch-rest-client 6.5.4 org.elasticsearch.plugin lang-mustache-client 6.5.4 org.elasticsearch.plugin percolator-client 6.5.4 org.elasticsearch.plugin parent-join-client 6.5.4 org.elasticsearch.plugin rank-eval-client 6.5.4 com.google.code.gson gson 2.8.0 二、创建连接

在使用java调用Elasticsearch之前,需要先创建客户端与Elasticsearch服务器之间的连接。创建连接的代码片段如下:

import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.net.UnknownHostException; public class ESUtil { private static volatile TransportClient client; /** *采用双端检索机制实现客户端为单例模式 * @param clusterName 你的Elasticsearch集群名称 * @param hostName 你的Elasticsearch的主机Ip地址 * @param hostPort 你的Elasticsearch与客户端通信的端口,一般为9300 * @return TransportClient * @throws UnknownHostException */ @SuppressWarnings("resource") public static TransportClient getClient(String clusterName, String hostName, int hostPort) { if (client == null) { synchronized (TransportClient.class) { try { client = new PreBuiltTransportClient(Settings.builder().put("cluster.name", clusterName).build()) .addTransportAddress(new TransportAddress(InetAddress.getByName(hostName), hostPort)); } catch (UnknownHostException e) { e.printStackTrace(); } } } return client; } } 三、初始化Elasticsearch中的索引模板

1、初始化索引模板相关代码如下:

/** * 判断索引是否存在 * @param client * @param indexName * @return */ public boolean existIndex(TransportClient client,String indexName){ boolean existIndex = false; try { existIndex = client.admin().indices().exists(new IndicesExistsRequest().indices(new String[]{indexName})) .actionGet().isExists(); } catch (Exception e) { e.printStackTrace(); } return existIndex; } /** * 创建并初始化索引 *@param clazz 需要创建索引的实体类 * @param indexName 需要创建的索引名称 */ @SuppressWarnings("rawtypes") public void initIndex(TransportClient client, String indexName, Class clazz){ try { if(existIndex(client,indexName)){ return; //如果该索引存在,则不创建 } CreateIndexRequestBuilder cirBuilder = client.admin().indices().prepareCreate(indexName); XContentBuilder mapping = XmlContentUtil.getXContentBuilderMapping(clazz); cirBuilder.addMapping("doc",mapping); cirBuilder.execute().actionGet(); } catch (IOException e) { e.printStackTrace(); } }

2、模板与相关实体类的转换如下:

import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import com.ips.esb.model.EsbMonitor; public class XmlContentUtil { /** * Log转化为ES标准数据 * @param object 要转化的实体类Monitor * @return */ public static XContentBuilder getXContentBuilder(Object object){ if(object.getClass().getName().contains("Monitor")){ return getEsbMonitorXContentBuilder((Monitor)object); } return null; } /** * Monitor转化为ES标准数据 * @param monitor * @return */ public static XContentBuilder getEsbMonitorXContentBuilder(Monitor monitor) { XContentBuilder xContentBuilder = null; try { xContentBuilder = XContentFactory.jsonBuilder().startObject()// 标识开始设置值 .field("uuid", monitor.getUuid()) .field("sys_id", monitor.getSysId()) .field("call_sys_id", monitor.getCallSysId()) .field("server_ip", monitor.getServerIp()) .field("remote_ip", monitor.getRemoteIp()) .field("pub_item_name", monitor.getPubItemName()) .field("parent_pub_item_name", monitor.getParentPubItemName()) .field("session_id", monitor.getSessionId()) .field("order_num", monitor.getOrderNum()) .field("monitor_id", monitor.getMonitorId()) .field("start_time", monitor.getStartTime()) .field("duration", monitor.getDuration()) .field("status", monitor.getStatus()) .field("result_code", monitor.getResultCode()) .field("result_desc", monitor.getResultDesc()) .field("data_size_in", monitor.getDataSizeIn()) .field("data_size_out", monitor.getDataSizeOut()) .field("gateway_code", monitor.getGatewayCode()) .field("token_id", monitor.getTokenId()) .field("client_id", monitor.getClientId()) .field("server_id", monitor.getServerId()) .endObject(); } catch (Exception e) { e.printStackTrace(); } return xContentBuilder; } /** * 生成ES标准数据 * @param object * @return * @throws IOException */ @SuppressWarnings("rawtypes") public static XContentBuilder getXContentBuilderMapping(Class clazz) throws IOException{ if(clazz.getName().contains("Monitor")){ return XmlContentUtil.getEsbMonitorXContentBuilderMapping(); } return null; } /** * 生成Monitor的ES标准数据模板 * @return * @throws IOException */ public static XContentBuilder getEsbMonitorXContentBuilderMapping() throws IOException { Map keyword = new HashMap(); keyword.put("type", "keyword"); keyword.put("ignore_above", 256); XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject()//标识开始设置值 .startObject("properties") .startObject("@timestamp").field("type","date").endObject() .startObject("@version").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("uuid").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("sys_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("call_sys_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("server_ip").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("remote_ip").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("pub_item_name").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("parent_pub_item_name").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("session_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("order_num").field("type","integer").endObject() .startObject("monitor_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("start_time").field("type","date").endObject() .startObject("duration").field("type","long").endObject() .startObject("status").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("result_code").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("result_desc").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("data_size_in").field("type","long").endObject() .startObject("data_size_out").field("type","long").endObject() .startObject("gateway_code").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("token_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("client_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .startObject("server_id").field("type","text").startObject("fields").field("keyword",keyword).endObject().endObject() .endObject().endObject(); return mapping; } }

3、Monitor实体类数据结构如下:

import java.io.Serializable; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; public class EsbMonitor implements Serializable { private String uuid; private String sysId; private String callSysId; private String serverIp; private String remoteIp; private String pubItemName; private String parentPubItemName; private String sessionId; private Integer orderNum; private String monitorId; private Date startTime; private Long duration; private String status; private String resultCode; private String resultDesc; private Long dataSizeIn; private Long dataSizeOut; private String gatewayCode; private String tokenId; private String clientId; private String serverId; } 四、向ES中写入数据 /** * 向指定索引中批量插入数据 * @param * @param indexName 索引名称 * @param logList Log 的List集合 * @return */ @SuppressWarnings("rawtypes") public void addLogListIntoES(String indexName,List logList, Class clazz){ TransportClient client = this.getClient(); try { BulkRequestBuilder brBulider = client.prepareBulk(); initIndex(client,indexName,clazz); //初始化索引 IndexRequest request = null; for (Object log : logList) { request = client.prepareIndex(indexName,"doc") .setSource(XmlContentUtil.getXContentBuilder(log)).request(); brBulider.add(request); } BulkResponse bulkResponse = brBulider.execute().actionGet(); if(bulkResponse.hasFailures()){ throw new RuntimeException("日志写入失败"); } }catch(Exception e){ e.printStackTrace(); } } 五、从ES中读取数据

1、跟查询数据密切相关的就是查询条件的添加了,下面代码中说明了如何创建查询条件以及向查询语句中添加查询条件以获取目标数据:

import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.*; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import com.google.gson.Gson; /** * @author allen * @date 2019/11/28-15:01 */ public class SearchUtil { private List aggregationBuilders = null; private BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); public SearchUtil(){ aggregationBuilders = new ArrayList(); } /** * 添加查询参数(查询类型为And) * @param key * @param value */ public void setParamAnd(String key, String value){ boolQueryBuilder.filter(QueryBuilders.termQuery(key, value)); } /** * 添加模糊查询参数(查询类型为And) * @param key * @param value */ public void setParamLike(String key, String value){ boolQueryBuilder.filter(QueryBuilders.wildcardQuery(key, "*"+value+"*")); } /** * *添加模糊查询参数(查询类型为And)(无分词查询) */ public void setParamAndNotPhrase(String key, String value) { this.boolQueryBuilder.filter(QueryBuilders.termQuery(key + ".keyword", value)); } /** * 添加查询参数(查询类型为Or) * @param key * @param value */ public void setParamOr(String key, String value){ boolQueryBuilder.should(QueryBuilders.termQuery(key,value)); } /** * Date类型查询参数及范围设置 * @param key * @param begin * @param end */ public void setDateRange(String key, Date begin, Date end){ String beginStr = DateUtil.getSub8DateStr(begin); String endStr = DateUtil.getSub8DateStr(end); setRange(key, beginStr, endStr); } /** * Date类型查询参数及范围设置 * @param key * @param begin * @param end */ public void setRange(String key, String begin, String end){ boolQueryBuilder.must(QueryBuilders.rangeQuery(key).from(begin).to(end)); } /** * 添加聚合查询条件 * @param key */ public void setAggregationParam(String key){ aggregationBuilders.add(AggregationBuilders.terms(key).field(key + ".keyword")); } /** * 获取ES集群下的所有索引名称 * * @return */ public Set getAllIndex() { TransportClient client = getClient(); Set indexSet = null; try { ActionFuture isr = client.admin().indices().stats(new IndicesStatsRequest().all()); indexSet = isr.actionGet().getIndices().keySet(); }catch(Exception e){ e.printStackTrace(); } return indexSet; } /** * 获取指定索引下的数据量 * * @param indexName * @return */ public long getCountByIndex(String indexName) { TransportClient client = getClient(); long totalHites = 0l; try { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName).setTypes("doc"); searchRequestBuilder.setQuery(boolQueryBuilder); totalHites = searchRequestBuilder.get().getHits().getTotalHits(); }catch(Exception e){ e.printStackTrace(); } return totalHites; } /** * 聚合检索 * * @param indexName * @return */ public Map getResultByAggreation(String indexName) { TransportClient client = getClient(); try { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName).setTypes("doc"); //添加查询条件 searchRequestBuilder.setQuery(boolQueryBuilder); for (TermsAggregationBuilder termsAggregationBuilder : aggregationBuilders) { searchRequestBuilder.addAggregation(termsAggregationBuilder); } SearchResponse actionGet = searchRequestBuilder.execute().actionGet(); Aggregations aggregations = actionGet.getAggregations(); Map asMap = aggregations.getAsMap(); return asMap; }catch(Exception e){ e.printStackTrace(); return null; } } /** * 根据日志uuid查询单条记录 * @param indexName * @return */ public Object getByUuid(String indexName, Class clazz) { TransportClient client = new SearchUtil().getClient(); Object logObj = null; try { if(!existIndex(client,indexName)){ return null; } SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName) .setSearchType(SearchType.DEFAULT) .setSize(10) .setScroll(new TimeValue(1000)); //添加查询条件 searchRequestBuilder.setQuery(boolQueryBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchHits hitsFirst = searchResponse.getHits(); Gson gson = new Gson(); for (SearchHit hit : hitsFirst) { logObj = gson.fromJson(hit.getSourceAsString(), clazz); break; } } catch (Exception e) { e.printStackTrace(); } return logObj; } /** * 分页查询 * @param indexName * @return */ @SuppressWarnings({ "rawtypes", "unchecked" }) public Page pageByIndex(String indexName, Class clazz, Page page) { List logList = new CopyOnWriteArrayList(); TransportClient client = new SearchUtil().getClient(); try { if(!existIndex(client,indexName)){ return page; } int pageSize = page.getPageSize(); int pageNo = page.getCurrentPage(); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(new String[]{indexName}).setSearchType(SearchType.DEFAULT).setFrom((pageNo - 1) * pageSize).setSize(pageSize); searchRequestBuilder.setQuery(this.boolQueryBuilder); SearchResponse searchResponse = (SearchResponse)searchRequestBuilder.execute().actionGet(); SearchHits hitsFirst = searchResponse.getHits(); Gson gson = new Gson(); BaseLogObj logObj = null; Iterator var13 = hitsFirst.iterator(); while(var13.hasNext()) { SearchHit hit = (SearchHit)var13.next(); logObj = (BaseLogObj)gson.fromJson(hit.getSourceAsString(), clazz); logList.add(logObj.convertToLog()); } long maxNum = this.getCountByIndex(indexName); int maxPage = (int)maxNum / pageSize; int totalPages = 0; if (maxNum % (long)pageSize == 0L) { totalPages = maxPage; } else { ++maxPage; totalPages = maxPage; } page.setTotalPages(totalPages); page.setTotalRows((int)maxNum); } catch (Exception e) { e.printStackTrace(); } page.setResults(logList); return page; } /** * 查询 List集合 * @param indexName * @return */ public List listByIndex(String indexName, Class clazz) { List logList = new CopyOnWriteArrayList(); TransportClient client = new SearchUtil().getClient(); try { if(!existIndex(client,indexName)){ return logList; } SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName) .setSearchType(SearchType.DEFAULT) .setSize(10) .setScroll(new TimeValue(1000)); //添加查询条件 searchRequestBuilder.setQuery(boolQueryBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchHits hitsFirst = searchResponse.getHits(); Gson gson = new Gson(); BaseLogObj logObj = null; for (SearchHit hit : hitsFirst) { logObj = (BaseLogObj)gson.fromJson(hit.getSourceAsString(), clazz); logList.add(logObj.convertToLog()); } //获取总数量 long maxNum = searchResponse.getHits().getTotalHits(); int maxPage = (int) maxNum / 10;//计算总页数 for (int i = 1; i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有