|
|
@@ -100,45 +100,55 @@ public class DataCollector {
|
|
|
new BasicHeader(IotPlatformMetadata.Headers.PANDA_TOKEN, key.getAccessKey()),
|
|
|
new BasicHeader(IotPlatformMetadata.Headers.AUTHORIZATION, key.getSecretKey())
|
|
|
);
|
|
|
-
|
|
|
- //查询时间
|
|
|
- LocalDateTime beginTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours);
|
|
|
- LocalDateTime endTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS);
|
|
|
-
|
|
|
//结果
|
|
|
List<Point> results = new ArrayList<>();
|
|
|
|
|
|
//===================按设备类型遍历===================
|
|
|
for (IotPlatformMetadata.DeviceMapping deviceMapping : IotPlatformMetadata.DeviceMapping.values()) {
|
|
|
- //参数
|
|
|
- Map<String, Object> params = IotPlatformMetadata.getHistoryDataParams(deviceMapping, beginTime, endTime);
|
|
|
- //请求结果项
|
|
|
- List<IotPlatformData> items;
|
|
|
- try {
|
|
|
- items = callingUtil.requestList(url, HttpMethod.POST, params, headers,
|
|
|
- new TypeReference<IotPlatformResult<List<IotPlatformData>>>() {
|
|
|
- });
|
|
|
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
- , String.format("拉取数据成功,数据量:%d", items.size()));
|
|
|
- } catch (SelfException e) {
|
|
|
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
|
|
|
- , String.format("拉取数据失败 error:%s", e)
|
|
|
- );
|
|
|
- continue;
|
|
|
+ //查询时间
|
|
|
+ LocalDateTime beginTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours);
|
|
|
+ LocalDateTime endTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS);
|
|
|
+ //查询分段数量
|
|
|
+ int segmentCount = (int) Math.ceil((double) previousHours / IotPlatformMetadata.MAX_QUERY_HOU);
|
|
|
+ //查询结果
|
|
|
+ List<IotPlatformData> allItems = new ArrayList<>();
|
|
|
+ //===================按时间分段查询===================
|
|
|
+ for (int i = 0; i < segmentCount; i++) {
|
|
|
+ //计算当前分段的开始和结束时间
|
|
|
+ LocalDateTime segmentStart = beginTime.plusHours((long) i * IotPlatformMetadata.MAX_QUERY_HOU);
|
|
|
+ LocalDateTime segmentEnd = (i == segmentCount - 1) ? endTime : segmentStart.plusHours(IotPlatformMetadata.MAX_QUERY_HOU);
|
|
|
+ //确保结束时间不超过总结束时间
|
|
|
+ if (segmentEnd.isAfter(endTime)) segmentEnd = endTime;
|
|
|
+ //参数
|
|
|
+ Map<String, Object> params = IotPlatformMetadata.getHistoryDataParams(deviceMapping, segmentStart, segmentEnd);
|
|
|
+ //请求结果项
|
|
|
+ List<IotPlatformData> items;
|
|
|
+ try {
|
|
|
+ items = callingUtil.requestList(url, HttpMethod.POST, params, headers,
|
|
|
+ new TypeReference<IotPlatformResult<List<IotPlatformData>>>() {
|
|
|
+ });
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("拉取数据成功,数据量:%d", items.size()));
|
|
|
+ } catch (SelfException e) {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("拉取数据失败 error:%s", e)
|
|
|
+ );
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ allItems.addAll(items);
|
|
|
}
|
|
|
|
|
|
+ //===================处理设备注册更新===================
|
|
|
//按设备(远传id)分组
|
|
|
- Map<String, List<IotPlatformData>> snGroup = items.stream()
|
|
|
+ Map<String, List<IotPlatformData>> snGroup = allItems.stream()
|
|
|
.collect(Collectors.groupingBy(IotPlatformData::getCode));
|
|
|
//注册设备
|
|
|
registryDevices(deviceMapping, snGroup);
|
|
|
//更新设备
|
|
|
updateDevices(deviceMapping, snGroup);
|
|
|
|
|
|
- //设备列表
|
|
|
- List<Device> devices = deviceRegistry.findAll(IotPlatformMetadata.NAME);
|
|
|
//===================按设备遍历===================
|
|
|
- for (Device device : devices) {
|
|
|
+ for (Device device : deviceRegistry.findAll(IotPlatformMetadata.NAME)) {
|
|
|
if (!snGroup.containsKey(device.getDeviceSn())) continue;
|
|
|
//当前设备项
|
|
|
List<IotPlatformData> deviceItems = snGroup.get(device.getDeviceSn());
|