12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391 |
- package com.shkpr.service.aimodelpower.bizmgr;
- import com.alibaba.fastjson.JSONObject;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.global.base.tools.FastJsonUtil;
- import com.shkpr.service.aimodelpower.commtools.TimeTool;
- import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
- import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterCollecationService;
- import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService;
- import com.shkpr.service.aimodelpower.dto.TraceRunnable;
- import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
- import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.time.Duration;
- import java.time.LocalDate;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.format.DateTimeFormatter;
- import java.util.*;
- import java.util.concurrent.CountDownLatch;
- import java.util.stream.Collectors;
- /**
- * @ClassName KprAimTapWaterBizFun
- * @Description: TODO
- * @Author LX
- * @Date 2024/5/22
- * @Version V1.0
- **/
- public class KprAimTapWaterBizFun {
- private static final String MSG_SUCCESS = "success.";
- private static final String MSG_FAILED = "failed.";
- private static final String mStrClassName = "KprAimTapWaterBizFun";
- private static final String EMPTY_NULL = "NULL";
- public static WaterTapWaterService getWaterTapWaterApi(){
- return DBMgrProxy.getInstance().applyWaterTapWaterService();
- }
- public static WaterCollecationService getWaterCollecationApi(){
- return DBMgrProxy.getInstance().applyWaterCollecationService();
- }
- static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- //TODO 15分间隔的预测插值(做假暂时用,不用时注释)
- //TODO 思路:等预测小时的原程序完成后, 原预测表中为小时预测数据,分钟字段为空,将预测表中的数据进行二次清洗然后插入新预测表
- //TODO beforeDays为往后推清洗多少天 n-1为逻辑, 例如传1就是0天当天
- public static void yuceZuojia(int beforeDays){
- try{
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值进行中:......");
- List<Map<String, Object>> orgConfig = getWaterCollecationApi().getOrgConfig(false, 0, 0, " AND org_name != '井口水厂' AND org_name != '丰收坝水厂' AND org_name != '沙坪坝水厂' AND org_name != '北碚红工水厂' AND org_name != '大溪沟水厂' ");
- final CountDownLatch latch = new CountDownLatch(orgConfig.size());
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- LocalDateTime startDate = LocalDateTime.now().withMinute(0).withSecond(0);
- for (Map<String,Object> org:orgConfig) {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- try {
- for (int dayNum=0;dayNum<beforeDays;dayNum++) {
- String dateStr = startDate.minusDays(-dayNum).format(formatter);
- List<Map<String, Object>> resHourList = getWaterCollecationApi().getTbMHourwater(false,
- 20, 0, " AND \"orgId\" = '" + org.get("org_id") + "' AND \"Date\" = '" + dateStr + "' ORDER BY \"Date\",\"Hour\" ASC ");
- processWaterData(resHourList);
- }
- latch.countDown();
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- });
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值已结束");
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch yuceZuojia ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- private static DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
- public static void processWaterData(List<Map<String, Object>> resHourList) {
- List<List<Map<String, Object>>> trendGroups = new ArrayList<>();
- List<Map<String, Object>> currentGroup = new ArrayList<>();
- for (int i = 0; i < resHourList.size(); i++) {
- Map<String, Object> currentMap = resHourList.get(i);
- double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
- if (i == 0) {
- currentGroup.add(currentMap);
- } else {
- double previousHourData = getDoubleValue(resHourList.get(i - 1), "HourForecastActualWaterSupply");
- boolean isIncreasing = currentHourData > previousHourData;
- if (currentGroup.size() > 0) {
- double lastGroupData = getDoubleValue(currentGroup.get(currentGroup.size() - 1), "HourForecastActualWaterSupply");
- boolean wasIncreasing = lastGroupData < currentHourData;
- if (isIncreasing != wasIncreasing) {
- trendGroups.add(new ArrayList<>(currentGroup));
- currentGroup.clear();
- }
- }
- currentGroup.add(currentMap);
- }
- }
- if (!currentGroup.isEmpty()) {
- trendGroups.add(currentGroup);
- }
- for (List<Map<String, Object>> group : trendGroups) {
- for (int i = 0; i < group.size() - 1; i++) {
- Map<String, Object> currentMap = group.get(i);
- double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
- double nextHourData = getDoubleValue(group.get(i + 1), "HourForecastActualWaterSupply");
- boolean isIncreasing = nextHourData > currentHourData;
- double[] splitData = splitHourData(currentHourData, isIncreasing);
- // 步骤2:平滑处理
- double alpha = 0.3; // 平滑系数
- double[] smoothedData = exponentialSmoothing(splitData,alpha);
- // 解析日期字符串
- LocalDate date = LocalDate.parse(currentMap.get("Date").toString(), DateTimeFormatter.ISO_LOCAL_DATE);
- // 解析时间字符串
- LocalTime time = LocalTime.parse(currentMap.get("Hour").toString(), DateTimeFormatter.ISO_LOCAL_TIME);
- LocalDateTime dateTime = LocalDateTime.of(date, time).minusMinutes(45);
- for (int j = 0; j < smoothedData.length; j++) {
- Map<String, Object> newMap = new HashMap<>(currentMap);
- newMap.put("HourForecastActualWaterSupply", smoothedData[j]);
- LocalDateTime quarterHourTime = dateTime.plusMinutes(j * 15);
- newMap.put("Date", quarterHourTime.format(dateFormatter));
- newMap.put("Hour", quarterHourTime.format(timeFormatter));
- // Here you would query the database and insert/update data
- // For example:
- double originalValue = queryOriginalValue(quarterHourTime);
- newMap.put("HourActualWaterSupply", originalValue);
- newMap.put("LastModifyTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(TimeTool.TIMESTAMP_FORMAT)));
- // Retain only specific keys
- Map<String, Object> filteredMap = filterMap(newMap, "Date", "Hour", "HourForecastActualWaterSupply", "HourActualWaterSupply", "LastModifyTime", "orgId");
- // Insert or update the database
- // int insertRes = insertOrUpdateDatabase(filteredMap);
- // Handle the result if needed
- Integer insertRes = getWaterTapWaterApi().insertOrUpdateTbmHourWaterNew(filteredMap);
- if (insertRes < 1) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch yuceZuojia ERROR:{%s} ",
- "新增或修改失败"));
- }
- }
- }
- }
- }
- private static double getDoubleValue(Map<String, Object> map, String key) {
- Object value = map.get(key);
- return value != null ? Double.parseDouble(value.toString()) : 0.0;
- }
- //拆分函数
- private static double[] splitHourData(double hourData, boolean isIncreasing) {
- double[] splitData = new double[4];
- if (isIncreasing) {
- // 调整比例以实现更平滑的上升曲线
- splitData[0] = hourData * 0.24;
- splitData[1] = hourData * 0.26;
- splitData[2] = hourData * 0.25;
- splitData[3] = hourData * 0.25;
- } else {
- // 调整比例以实现更平滑的下降曲线
- splitData[0] = hourData * 0.25;
- splitData[1] = hourData * 0.25;
- splitData[2] = hourData * 0.26;
- splitData[3] = hourData * 0.24;
- }
- return splitData;
- }
- //平滑函数
- private static double[] smoothData(double[] data) {
- double[] smoothedData = new double[data.length];
- int windowSize = 3; // 移动平均窗口大小
- for (int i = 0; i < data.length; i++) {
- double sum = 0;
- int count = 0;
- for (int j = Math.max(0, i - windowSize / 2); j <= Math.min(data.length - 1, i + windowSize / 2); j++) {
- sum += data[j];
- count++;
- }
- smoothedData[i] = sum / count;
- }
- return smoothedData;
- }
- //指数平滑
- private static double[] exponentialSmoothing(double[] data, double alpha) {
- double[] smoothedData = new double[data.length];
- smoothedData[0] = data[0]; // 初始化第一个值
- for (int i = 1; i < data.length; i++) {
- smoothedData[i] = alpha * data[i] + (1 - alpha) * smoothedData[i - 1];
- }
- return smoothedData;
- }
- private static double queryOriginalValue(LocalDateTime dateTime) {
- // Simulate querying the database for the original value
- // Replace with actual database query
- return 100.0; // Placeholder value
- }
- private static Map<String, Object> filterMap(Map<String, Object> map, String... keys) {
- Map<String, Object> filteredMap = new LinkedHashMap<>();
- for (String key : keys) {
- if (map.containsKey(key)) {
- filteredMap.put(key, map.get(key));
- }
- }
- return filteredMap;
- }
- private static Map<String, List<String>> parseConfig(String configstr) {
- // Initialize the map to hold the parsed configuration
- Map<String, List<String>> configMap = new HashMap<>();
- // Get the configuration string
- String configString = configstr;
- if (configString != null) {
- // Split the configuration by different locations
- String[] locations = configString.split("},");
- for (String location : locations) {
- // Find the index of the opening brace
- int braceIndex = location.indexOf('{');
- if (braceIndex > 0) {
- // Extract the location name
- String locationName = location.substring(0,braceIndex).trim();
- // Extract the tags and split by comma
- String tagsString = location.substring(braceIndex + 1).replace("}", "").trim();
- List<String> tags = Arrays.asList(tagsString.split(","));
- // Add to the map
- configMap.put(locationName, tags);
- }
- }
- }
- return configMap;
- }
- //TODO 小时营业所片区用水量(自供+(供入-供出))
- //TODO beforHour是小时数,当前时间往前扣多少个小时 按15分钟刻度计算用水量
- public static void checkBusinessRecordAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){
- //TODO 检查小时用水量
- DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- try{
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量进行中:......");
- //获取配置关系
- //TODO 自供
- Map<String,List<String>> selfconfess = parseConfig(selfconfessStr);
- //TODO 供入
- Map<String,List<String>> supplyin = parseConfig(supplyinStr);
- //TODO 供出
- Map<String,List<String>> confess = parseConfig(confessStr);
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
- //TODO 需计算的循环天数
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- LocalDateTime newStartDateTime = startDateTime;
- String startDate = newStartDateTime.format(formater);
- String endDate = today.withMinute(1).withSecond(0).format(formater);
- final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size());
- List<String> closeTag = new ArrayList<>();
- closeTag.add("FSBSC_MAS.MAS.TOTALFLOWD");
- closeTag.add("fsbsc_mas.mas.totalflowg");
- for (String orgName:selfconfess.keySet()) {
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes() / 15)); i++) {
- String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
- String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1) * 15)).format(formater);//查询时间加一分钟
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", orgName);//水厂
- recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", "");
- //TODO 此循环计算该小时所有设备的用水量
- //TODO 先查各个营业所的自供值
- List<String> tagTags = selfconfess.get(orgName);//自供
- List<String> tagTags2 = supplyin.get(orgName);//供入
- List<String> tagTags3 = confess.get(orgName);//供出
- Double selfValue = null;
- Double supplyInValue = null;
- Double confessValue = null;
- try {
- selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime,orgName+"(自供)",closeTag);//自供
- supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime,orgName+"(供入)",null);//供入
- confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime,orgName+"(供出)",null);//供出
- } catch (Exception ex) {
- }
- Double value = null;//总计算值
- if (selfValue != null) {
- if (supplyInValue != null && confessValue == null) {
- value = selfValue + supplyInValue;
- } else if (supplyInValue != null && confessValue != null) {
- value = selfValue + supplyInValue - confessValue;
- } else if (supplyInValue == null && confessValue == null) {
- value = selfValue;
- } else if (supplyInValue == null && confessValue != null) {
- value = selfValue - confessValue;
- }
- }
- recordAllEntity.put("value", value);
- // System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- // //TODO 说明不存在,进行插入
- if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- } else {
- //TODO 说明存在,进行修改
- if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value), " WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量已完成");
- }catch(Exception ex){
- ex.printStackTrace();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch checkBusinessRecordAllData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO beforHour是小时数,当前时间往前扣多少个小时 按一小时刻度计算用水量
- public static void checkBusinessRecordOldAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){
- //TODO 检查小时用水量
- DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- try{
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量进行中:......");
- //获取配置关系
- //TODO 自供
- Map<String,List<String>> selfconfess = parseConfig(selfconfessStr);
- //TODO 供入
- Map<String,List<String>> supplyin = parseConfig(supplyinStr);
- //TODO 供出
- Map<String,List<String>> confess = parseConfig(confessStr);
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
- //TODO 需计算的循环天数
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- LocalDateTime newStartDateTime = startDateTime;
- String startDate = newStartDateTime.format(formater);
- String endDate = today.withMinute(1).withSecond(0).format(formater);
- List<String> closeTag = new ArrayList<>();
- closeTag.add("FSBSC_MAS.MAS.TOTALFLOWD");
- closeTag.add("fsbsc_mas.mas.totalflowg");
- final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size());
- for (String orgName:selfconfess.keySet()) {
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
- String startTime = newStartDateTime.minusHours(-i).format(formater);
- String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", orgName);//水厂
- recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", "");
- //TODO 此循环计算该小时所有设备的用水量
- //TODO 先查各个营业所的自供值
- List<String> tagTags = selfconfess.get(orgName);//自供
- List<String> tagTags2 = supplyin.get(orgName);//供入
- List<String> tagTags3 = confess.get(orgName);//供出
- Double selfValue = null;
- Double supplyInValue = null;
- Double confessValue = null;
- try {
- selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime,orgName+"(自供)",closeTag);//自供
- supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime,orgName+"(供入)",null);//供入
- confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime,orgName+"(供出)",null);//供出
- } catch (Exception ex) {
- }
- Double value = null;//总计算值
- if (selfValue != null) {
- if (supplyInValue != null && confessValue == null) {
- value = selfValue + supplyInValue;
- } else if (supplyInValue != null && confessValue != null) {
- value = selfValue + supplyInValue - confessValue;
- } else if (supplyInValue == null && confessValue == null) {
- value = selfValue;
- } else if (supplyInValue == null && confessValue != null) {
- value = selfValue - confessValue;
- }
- }
- recordAllEntity.put("value", value);
- System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- // //TODO 说明不存在,进行插入
- if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- } else {
- //TODO 说明存在,进行修改
- if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value), " WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量已完成");
- }catch(Exception ex){
- ex.printStackTrace();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch checkBusinessRecordAllData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 计算总值
- private static Double selfconfess(List<String> tagTags,String startDate,String endDate,String startTime,String endTime,String desc,List<String> closeTag) throws Exception{
- Double value = null;
- if (tagTags==null){
- return value;
- }
- for (String tagTag : tagTags) {
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- if(closeTag!=null&&closeTag.size()>0){
- if(closeTag.contains(tagTag)){
- //如果存在就过滤 结束当前 循环
- continue;
- }
- }
- if (dateTime.getHour()==0&&dateTime.getMinute()==16&&("SS.SSCOM.F36:2".equals(tagTag) ||
- "SS.SSCOM.F36:5".equals(tagTag) ||
- "SS.SSCOM.F36:8".equals(tagTag))) {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + tagTag + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + tagTag + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- if (value == null) {
- value = 0.00;
- }
- value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- }
- }
- System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";设备标签:"+tagTag+";用水量:"+value);
- }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + tagTag + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + tagTag + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";设备标签:"+tagTag+";用水量:"+value);
- }
- }
- }
- // System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";用水量:"+value+";总值:"+value);
- return value;
- }
- //TODO 15分钟 分区用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
- public static void checkRecordAllDataBusinessFenqu(int beforHour){
- //TODO 检查小时用水量
- DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- for (Object key:groupedData.keySet()){
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
- //TODO 需计算的循环天数
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- LocalDateTime newStartDateTime = startDateTime;
- String startDate = newStartDateTime.format(formater);
- String endDate = today.withMinute(1).withSecond(0).format(formater);
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) {
- String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
- String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1)*15)).format(formater);//查询时间加一分钟
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", key.toString());//水厂
- recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1)*15)).format(formater));//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- // if (dateTime.getHour()==0&&dateTime.getMinute()==16&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
- // "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
- // "SS.SSCOM.F36:8".equals(item.get("collcation_tag").toString()))) {
- // Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- // " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- // " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- // if(itemCount!=null&&itemCount>0) {
- // List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- // " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- // " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " order by QCQUISITION_TIME");
- // if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- // if (value == null) {
- // value = 0.00;
- // }
- // value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- // }
- // }
- // }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }
- // }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value)," WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){}
- }
- try{latch.await();}catch(Exception ex){}
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }
- //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
- public static void checkRecordAllData(int beforHour){
- //TODO 检查小时用水量
- DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- for (Object key:groupedData.keySet()){
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
- //TODO 需计算的循环天数
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- LocalDateTime newStartDateTime = startDateTime;
- String startDate = newStartDateTime.format(formater);
- String endDate = today.withMinute(1).withSecond(0).format(formater);
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
- String startTime = newStartDateTime.minusHours(-i).format(formater);
- String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", key.toString());//水厂
- recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- if (dateTime.getHour()==1&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
- "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
- "SS.SSCOM.F36:8".equals(item.get("collcation_tag").toString()))) {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- if (value == null) {
- value = 0.00;
- }
- value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- }
- }
- }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }
- }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){}
- }
- try{latch.await();}catch(Exception ex){}
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }
- //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
- public static void checkRecordData(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
- if(!CollectionUtils.isEmpty(configList)){
- //声明总数据的数据数组
- List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
- //TODO ②开启多线程并发处理各水厂设备的数据
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--进行中:......");
- final CountDownLatch latch = new CountDownLatch(configList.size());
- for(Map<String,Object> item:configList){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
- " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
- int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
- Integer limit = 2000;
- if(pageNum<=1){
- limit = itemCount;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i*limit;
- //tapWaterHistoryList 为远通水量数据源
- //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
- Integer limitNew = 0;
- if(i>0){
- limitNew = offset+2000;
- }else{
- limitNew = 2000;
- }
- List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
- "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
- "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
- //TODO 循环远通水量数据列表,查询数据不存在的话就插入
- for (int j = 0; j < tapWaterHistoryList.size(); j++) {
- List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
- " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
- +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
- if(CollectionUtils.isEmpty(queryWaterRecord)){
- //TODO 说明没插入过本系统,执行插入
- String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
- + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
- + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
- + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
- +") ";
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
- if(insertCode<=0){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- extend));
- }
- }
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
- item.get("org_name"),
- item.get("collcation_tag")
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加对比远通数据
- public static void initTapWaterData(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
- if(!CollectionUtils.isEmpty(configList)){
- //声明总数据的数据数组
- List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
- //TODO ②开启多线程并发处理各水厂设备的数据
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"进行中:......");
- final CountDownLatch latch = new CountDownLatch(configList.size());
- LocalDateTime startTimeLocal = LocalDateTime.now().minusDays(1).minusHours(1);
- for(Map<String,Object> item:configList){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
- int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
- Integer limit = 2000;
- if(pageNum<=1){
- limit = itemCount;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i*limit;
- //tapWaterHistoryList 为远通水量数据源
- //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
- Integer limitNew = 0;
- if(i>0){
- limitNew = offset+2000;
- }else{
- limitNew = 2000;
- }
- List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
- //TODO 循环远通水量数据列表,查询数据不存在的话就插入
- for (int j = 0; j < tapWaterHistoryList.size(); j++) {
- List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
- " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
- +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
- if(CollectionUtils.isEmpty(queryWaterRecord)){
- //TODO 说明没插入过本系统,执行插入
- String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
- + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
- + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
- + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
- +") ";
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
- if(insertCode<=0){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- extend));
- }
- }
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
- item.get("org_name"),
- item.get("collcation_tag")
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
- public static void initWaterCollecationReacordAll(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据
- // LocalDateTime startDateTime = LocalDateTime.now().withMinute(0).withSecond(0)
- // .minusDays(1).minusHours(1);
- LocalDateTime startDateTime = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0)
- .minusDays(3);
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每15分钟的设备数据
- for (int i = 0; i < 96; i++) {
- String startTime = newStartDateTime.minusHours(-i).format(formater);
- String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", key.toString());//水厂
- recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- // if (dateTime.getHour()==1&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
- // "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
- // "SS.SSCOM.F36:8".equals(item.get("collcation_tag").toString()))) {
- // Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- // " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- // " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- // if(itemCount!=null&&itemCount>0) {
- // List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- // " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- // " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- // " order by QCQUISITION_TIME");
- // if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- // if (value == null) {
- // value = 0.00;
- // }
- // value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- // }
- // }
- // }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }
- // }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch2.countDown();
- }
- });
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加计算泵每日的小时数据
- public static void initWaterPumpReacordAll(String startFindTime){
- //TODO ① 首先查询水厂设备配置信息
- try {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据起始时间:("+startFindTime+")进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照设备分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- new Thread(() -> {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
- LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- days+=1;
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
- try {
- new Thread(() -> {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < 24; i++) {
- String startTime = newStartDateTime.withHour(i).format(formater);
- //TODO 需要加个05分把endTime的整点数据查出来
- String endTime = newStartDateTime.withHour(i).minusHours(-1).withMinute(5).withSecond(0).format(formater);
- //TODO 此循环计算该小时所有设备的泵数据
- for (Map<String, Object> item : deviceList) {
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS(" +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS( " +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5 " +
- " order by QCQUISITION_TIME");
- //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) {
- //TODO 数据库操作
- for (Map<String,Object> mapEntity:tapWaterHistoryList) {
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
- recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
- .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
- if(item.get("standard_code").equals("active_energy")){
- recordAllEntity.put("active_energy",mapEntity.get("VAL"));
- }
- if(item.get("standard_code").equals("startup_state")){
- recordAllEntity.put("startup_state",mapEntity.get("VAL"));
- }
- if(item.get("standard_code").equals("phase_a_current")){
- if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) {
- recordAllEntity.put("startup_state", 0);
- }else{
- recordAllEntity.put("startup_state", 1);
- }
- }
- //TODO 直接调用插入或者新增方法
- int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity);
- if (insertCode < 0) {
- System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }
- }
- }
- }
- latch2.countDown();
- }).start();
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- System.out.print(String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- }
|