Selaa lähdekoodia

实现influxdb读写

欧阳劲驰 2 kuukautta sitten
vanhempi
commit
3305245992

+ 6 - 0
pom.xml

@@ -83,6 +83,12 @@
             <!--<scope>runtime</scope>-->
         </dependency>
 
+        <!--influxdb-->
+        <dependency>
+            <groupId>org.influxdb</groupId>
+            <artifactId>influxdb-java</artifactId>
+        </dependency>
+
         <!-- spring-security 和 jwt -->
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 21 - 0
src/main/java/com/shkpr/service/alambizplugin/annotation/InfluxDbMapping.java

@@ -0,0 +1,21 @@
+package com.shkpr.service.alambizplugin.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * influx字段映射
+ *
+ * @author 欧阳劲驰
+ * @since 0.0.1
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface InfluxDbMapping {
+    /**
+     * @return influx字段
+     */
+    String value();
+}

+ 35 - 0
src/main/java/com/shkpr/service/alambizplugin/commproperties/InfluxDbProperties.java

@@ -0,0 +1,35 @@
+package com.shkpr.service.alambizplugin.commproperties;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * influxdb属性
+ * <p>spring自动装配有同名bean,务必不要写同名字段</p>
+ *
+ * @author 欧阳劲驰
+ * @since 0.0.1-dev
+ */
+@Getter
+@AllArgsConstructor
+public class InfluxDbProperties {
+    /**
+     * 连接地址
+     */
+    private String url;
+
+    /**
+     * 用户名
+     */
+    private String user;
+
+    /**
+     * 密码
+     */
+    private String password;
+
+    /**
+     * 数据库
+     */
+    private String database;
+}

+ 217 - 0
src/main/java/com/shkpr/service/alambizplugin/commtools/InfluxDbUtil.java

@@ -0,0 +1,217 @@
+package com.shkpr.service.alambizplugin.commtools;
+
+import com.global.base.log.LogLevelFlag;
+import com.global.base.log.LogPrintMgr;
+import com.shkpr.service.alambizplugin.annotation.InfluxDbMapping;
+import com.shkpr.service.alambizplugin.commproperties.InfluxDbProperties;
+import com.shkpr.service.alambizplugin.constants.InfluxdbMetadata;
+import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * influxdb工具类
+ *
+ * @author 李兴
+ * @since 0.0.1-dev
+ **/
+@Component
+@Slf4j
+public class InfluxDbUtil {
+    /**
+     * log
+     */
+    private static final String mStrClassName = "InfluxDbUtil";
+    private static final String mBizType = LogFlagBusiType.BUSI_PIPE_BURST.toStrValue();
+    final
+    InfluxDB influxDb;
+    final
+    InfluxDbProperties properties;
+
+    public InfluxDbUtil(InfluxDbProperties properties, InfluxDB influxDb) {
+        this.influxDb = influxDb;
+        this.properties = properties;
+        try {
+            //查询数据库信息
+            QueryResult queryResult = influxDb.query(new Query(InfluxdbMetadata.Command.SHOW_DATABASE, null));
+            List<String> databases = getValues(queryResult).stream()
+                    .map(database -> database.get(0).toString())
+                    .collect(Collectors.toList());
+            //数据库不存在,则创建数据库
+            if (databases.isEmpty() || !databases.contains(properties.getDatabase()))
+                influxDb.query(new Query(InfluxdbMetadata.Command.CREATE_DATABASE + properties.getDatabase(), null));
+        } catch (InfluxDBException e) {
+            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
+                    , String.format("尝试初始化数据库失败 error:%s", e)
+            );
+        }
+
+        //设置要使用的数据库
+        influxDb.setDatabase(properties.getDatabase());
+    }
+
+    /**
+     * 获取series
+     *
+     * @param queryResult 查询结果
+     * @return series
+     */
+    public static QueryResult.Series getSeries(QueryResult queryResult) {
+        if (queryResult == null) return null;
+
+        //获取结果集合
+        List<QueryResult.Result> results = queryResult.getResults();
+        if (results == null || results.isEmpty()) return null;
+
+        //获取第一个结果
+        QueryResult.Result result = results.get(0);
+        if (result == null) return null;
+
+        //获取series
+        List<QueryResult.Series> series = result.getSeries();
+        if (series == null || series.isEmpty()) return null;
+
+        //获取第一个series
+        return series.get(0);
+    }
+
+    /**
+     * 获取值集合
+     *
+     * @param queryResult 查询结果
+     * @return 值集合
+     */
+    public static List<List<Object>> getValues(QueryResult queryResult) {
+        //获取series
+        QueryResult.Series firstSeries = getSeries(queryResult);
+        if (firstSeries == null) return Collections.emptyList();
+
+        //获取值集合
+        List<List<Object>> values = firstSeries.getValues();
+        return values != null ? values : Collections.emptyList();
+    }
+
+    /**
+     * 插入
+     *
+     * @param point 点
+     * @return 插入状态
+     */
+    public Boolean insert(Point point) {
+        try {
+            influxDb.write(point);
+            return true;
+        } catch (Exception e) {
+            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
+                    , String.format("插入InfluxDb失败 error:%s", e)
+            );
+            return false;
+        }
+    }
+
+    /**
+     * 批量插入
+     *
+     * @param points 批量点
+     * @return 插入状态
+     */
+    public Boolean insertBatch(BatchPoints points) {
+        try {
+            influxDb.write(points);
+            return true;
+        } catch (Exception e) {
+            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
+                    , String.format("插入InfluxDb失败 error:%s", e)
+            );
+            return false;
+        }
+    }
+
+    /**
+     * 查询
+     *
+     * @param sql   sql
+     * @param clazz 实体类
+     * @param <E>   实体类类型
+     * @return 实体类集合
+     */
+    public <E> List<E> query(String sql, Class<E> clazz) {
+        //执行查询
+        QueryResult queryResult = influxDb.query(new Query(sql, properties.getDatabase()));
+        QueryResult.Series series = getSeries(queryResult);
+        if (series == null) return Collections.emptyList();
+        //获取字段
+        List<String> columns = series.getColumns();
+        //字段映射
+        Map<Integer, Field> fieldMap = Arrays.stream(clazz.getDeclaredFields())
+                //过滤需要导出的字段
+                .filter(f -> f.isAnnotationPresent(InfluxDbMapping.class))
+                //设置字段公开
+                .peek(f -> f.setAccessible(true))
+                .collect(Collectors.toMap(
+                        f -> {
+                            //获取映射值
+                            String mapping = f.getAnnotation(InfluxDbMapping.class).value();
+                            //获取对应的索引
+                            return IntStream.range(0, columns.size())
+                                    //过滤相同的值
+                                    .filter(index -> columns.get(index).equals(mapping))
+                                    .findFirst().orElse(-1);
+                        }, Function.identity(),
+                        (it1, it2) -> it2,
+                        HashMap::new
+                ));
+        //获取值
+        List<List<Object>> values = series.getValues();
+        //解析数据
+        List<E> dates = new ArrayList<>();
+        for (List<Object> value : values) {
+            try {
+                //实列化数据
+                E data = clazz.getDeclaredConstructor().newInstance();
+                //遍历字段
+                for (Map.Entry<Integer, Field> fieldEntry : fieldMap.entrySet()) {
+                    //跳过未找到的字段
+                    if (fieldEntry.getKey() == -1) continue;
+                    //获取项
+                    Object item = value.get(fieldEntry.getKey());
+                    //设置字段
+                    if (fieldEntry.getValue().getType().equals(LocalDateTime.class))
+                        fieldEntry.getValue().set(data,
+                                item != null ? ZonedDateTime.parse(item.toString())
+                                        .withZoneSameInstant(TimeZone.getDefault().toZoneId())
+                                        .toLocalDateTime()
+                                        : null);
+                    else if (fieldEntry.getValue().getType().equals(Double.class)) {
+                        fieldEntry.getValue().set(data, item != null ? Double.parseDouble(item.toString()) : null);
+                    } else fieldEntry.getValue().set(data, item);
+                }
+                //存入数据
+                dates.add(data);
+            } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+                     NoSuchMethodException e) {
+                LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
+                        , String.format("构建数据失败 error:%s", e)
+                );
+            }
+        }
+
+        return dates;
+    }
+
+}

+ 75 - 0
src/main/java/com/shkpr/service/alambizplugin/configuration/InfluxDbConfiguration.java

@@ -0,0 +1,75 @@
+package com.shkpr.service.alambizplugin.configuration;
+
+import com.shkpr.service.alambizplugin.commproperties.InfluxDbProperties;
+import okhttp3.OkHttpClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.influxdb.InfluxDB;
+import org.influxdb.impl.InfluxDBImpl;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.influx.InfluxDbOkHttpClientBuilderProvider;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Duration;
+
+@Configuration
+public class InfluxDbConfiguration {
+    private static final Log logger = LogFactory.getLog(InfluxDbConfiguration.class);
+
+    private final OkHttpClient.Builder builder;
+
+    @Value("${spring.influx.url:}")
+    private String url = "";
+    @Value("${spring.influx.user:}")
+    private String user = "";
+    @Value("${spring.influx.password:}")
+    private String password = "";
+    @Value("${spring.influx.database:}")
+    private String database = "";
+    @Value("${spring.influx.read-timeout:}")
+    private String readTimeout = "";
+
+    public InfluxDbConfiguration(ObjectProvider<InfluxDbOkHttpClientBuilderProvider> builder,
+                                 ObjectProvider<OkHttpClient.Builder> deprecatedBuilder) {
+        this.builder = determineBuilder(builder.getIfAvailable(),
+                deprecatedBuilder.getIfAvailable());
+    }
+
+    private static OkHttpClient.Builder determineBuilder(
+            InfluxDbOkHttpClientBuilderProvider builder,
+            OkHttpClient.Builder deprecatedBuilder) {
+        if (builder != null) {
+            return builder.get();
+        } else if (deprecatedBuilder != null) {
+            logger.warn(
+                    "InfluxDB client customizations using a OkHttpClient.Builder is deprecated, register a "
+                            + InfluxDbOkHttpClientBuilderProvider.class.getSimpleName()
+                            + " bean instead");
+            return deprecatedBuilder;
+        }
+        return new OkHttpClient.Builder();
+    }
+
+    /**
+     * influxDb属性
+     *
+     * @return influxDb属性
+     */
+    @Bean
+    public InfluxDbProperties influxDbProperties() {
+        return new InfluxDbProperties(url, user, password, database);
+    }
+
+    /**
+     * influxDb客户端
+     *
+     * @return influxDb客户端
+     */
+    @Bean
+    public InfluxDB influxDB() {
+        this.builder.readTimeout(Duration.ofMillis(Long.parseLong(readTimeout)));
+        return new InfluxDBImpl(url, user, password, this.builder);
+    }
+}

+ 30 - 0
src/main/java/com/shkpr/service/alambizplugin/constants/InfluxdbMetadata.java

@@ -0,0 +1,30 @@
+package com.shkpr.service.alambizplugin.constants;
+
+/**
+ * influxdb元数据
+ *
+ * @author 欧阳劲驰
+ * @since 0.0.1-dev
+ */
+public interface InfluxdbMetadata {
+    /**
+     * 指令
+     */
+    interface Command {
+        /**
+         * 查询数据库
+         */
+        String SHOW_DATABASE = "SHOW DATABASES";
+        /**
+         * 创建数据库
+         */
+        String CREATE_DATABASE = "CREATE DATABASE ";
+    }
+
+    /**
+     * SQL
+     */
+    interface SQL {
+
+    }
+}