|
@@ -3,19 +3,16 @@ package com.shkpr.iot.driver.mqtt.service;
|
|
import com.shkpr.iot.common.core.domain.po.DataInfo;
|
|
import com.shkpr.iot.common.core.domain.po.DataInfo;
|
|
import com.shkpr.iot.common.core.domain.po.Destination;
|
|
import com.shkpr.iot.common.core.domain.po.Destination;
|
|
import com.shkpr.iot.common.core.domain.po.Equipment;
|
|
import com.shkpr.iot.common.core.domain.po.Equipment;
|
|
|
|
+import com.shkpr.iot.common.driver.properties.DriverProperties;
|
|
import com.shkpr.iot.common.driver.service.DriverService;
|
|
import com.shkpr.iot.common.driver.service.DriverService;
|
|
import com.shkpr.iot.driver.mqtt.common.context.MqttHandlerContext;
|
|
import com.shkpr.iot.driver.mqtt.common.context.MqttHandlerContext;
|
|
import com.shkpr.iot.driver.mqtt.handler.MqttInboundHandler;
|
|
import com.shkpr.iot.driver.mqtt.handler.MqttInboundHandler;
|
|
import lombok.SneakyThrows;
|
|
import lombok.SneakyThrows;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
-import org.eclipse.paho.client.mqttv3.IMqttActionListener;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.IMqttToken;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -23,6 +20,7 @@ import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.function.Consumer;
|
|
import java.util.function.Consumer;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@@ -45,11 +43,22 @@ public class MqttService implements DriverService {
|
|
* <p>{@code k:code,v:client}</p>
|
|
* <p>{@code k:code,v:client}</p>
|
|
*/
|
|
*/
|
|
private final static Map<String, ScheduledFuture<?>> SCHEDULED_CACHE = new ConcurrentHashMap<>(2048);
|
|
private final static Map<String, ScheduledFuture<?>> SCHEDULED_CACHE = new ConcurrentHashMap<>(2048);
|
|
|
|
+
|
|
|
|
+ final
|
|
|
|
+ ThreadPoolTaskScheduler taskScheduler;
|
|
|
|
+ final
|
|
|
|
+ DriverProperties properties;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 数据消费
|
|
* 数据消费
|
|
*/
|
|
*/
|
|
private Consumer<DataInfo> dataConsumer;
|
|
private Consumer<DataInfo> dataConsumer;
|
|
|
|
|
|
|
|
+ public MqttService(ThreadPoolTaskScheduler taskScheduler, DriverProperties properties) {
|
|
|
|
+ this.taskScheduler = taskScheduler;
|
|
|
|
+ this.properties = properties;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 获取url
|
|
* 获取url
|
|
*
|
|
*
|
|
@@ -71,6 +80,20 @@ public class MqttService implements DriverService {
|
|
equipments.stream()
|
|
equipments.stream()
|
|
.filter(equipment -> equipment != null && equipment.getEnable())
|
|
.filter(equipment -> equipment != null && equipment.getEnable())
|
|
.forEach(this::register);
|
|
.forEach(this::register);
|
|
|
|
+ //定时重新注册
|
|
|
|
+ final AtomicBoolean firstSkip = new AtomicBoolean(true);
|
|
|
|
+ taskScheduler.scheduleWithFixedDelay(() -> {
|
|
|
|
+ //跳过首次执行
|
|
|
|
+ if (firstSkip.getAndSet(false)) return;
|
|
|
|
+ //遍历设备
|
|
|
|
+ equipments.forEach(equipment -> {
|
|
|
|
+ //如不是全部连接,则重新注册
|
|
|
|
+ if (!equipment.getDestinations().stream().allMatch(this::isConnected)) {
|
|
|
|
+ refRegister(equipment);
|
|
|
|
+ log.info("监测到MQTT目标断开,重新注册,设备:{}", equipment);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }, properties.getCheckInterval());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -119,6 +142,8 @@ public class MqttService implements DriverService {
|
|
|
|
|
|
/**
|
|
/**
|
|
* 注册
|
|
* 注册
|
|
|
|
+ *
|
|
|
|
+ * @param equipment 设备
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void register(Equipment equipment) {
|
|
public void register(Equipment equipment) {
|
|
@@ -137,11 +162,24 @@ public class MqttService implements DriverService {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * {@inheritDoc}
|
|
|
|
|
|
+ * 重新注册
|
|
|
|
+ *
|
|
|
|
+ * @param equipment 设备
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void refRegister(Equipment equipment) {
|
|
public void refRegister(Equipment equipment) {
|
|
|
|
+ if (equipment.getEquipmentCode() == null) return;
|
|
|
|
+ //获取设备标识
|
|
|
|
+ String code = equipment.getEquipmentCode();
|
|
|
|
+ if (StringUtils.isBlank(code) || !equipment.getEnable()) return;
|
|
|
|
+ //断开连接,并重连
|
|
|
|
+ equipment.getDestinations().forEach(destination -> {
|
|
|
|
+ disconnect(destination);
|
|
|
|
+ connect(equipment, destination);
|
|
|
|
+ });
|
|
|
|
|
|
|
|
+ //重新启动任务
|
|
|
|
+ if (!equipment.getPassive()) refSchedule(equipment);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -174,6 +212,7 @@ public class MqttService implements DriverService {
|
|
public void onSuccess(IMqttToken asyncActionToken) {
|
|
public void onSuccess(IMqttToken asyncActionToken) {
|
|
try {
|
|
try {
|
|
mqttClient.subscribe(destination.getTopic(), 2);
|
|
mqttClient.subscribe(destination.getTopic(), 2);
|
|
|
|
+ log.info("MQTT连接成功,目标:{}", destination);
|
|
} catch (MqttException e) {
|
|
} catch (MqttException e) {
|
|
log.error("MQTT客户端关闭失败,通讯标识:{},{}", destination.getHost(), destination.getPort(), e);
|
|
log.error("MQTT客户端关闭失败,通讯标识:{},{}", destination.getHost(), destination.getPort(), e);
|
|
}
|
|
}
|
|
@@ -202,7 +241,10 @@ public class MqttService implements DriverService {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * {@inheritDoc}
|
|
|
|
|
|
+ * 是否连接
|
|
|
|
+ *
|
|
|
|
+ * @param destination 目标
|
|
|
|
+ * @return 连接状态
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Boolean isConnected(Destination destination) {
|
|
public Boolean isConnected(Destination destination) {
|