初次提交
This commit is contained in:
@ -0,0 +1,27 @@
|
||||
package com.ho.datacollect;
|
||||
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @description: 数据采集
|
||||
* @date 2022/8/9
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"com.ho.common.tools", "com.ho.datacollect","com.ho.business","com.ho.flow"})
|
||||
@EnableDiscoveryClient
|
||||
//连的是business数据库
|
||||
@MapperScan(basePackages = "com.ho.business.mapper")
|
||||
@EnableFeignClients(basePackages = {"com.ho.business.feignclient", "com.ho.datacollect.feignclient"})
|
||||
@EnableAsync //开启异步
|
||||
@EnableCaching
|
||||
public class DataCollectApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(DataCollectApplication.class,args);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
package com.ho.datacollect.component;
|
||||
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 数据采集处理类
|
||||
* @date 2022/8/31
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DCStrategy {
|
||||
|
||||
@Autowired
|
||||
Map<String, DataCollectService> dcStrategy;
|
||||
|
||||
//异步调用方法,并指定 数据采集模块的线程池
|
||||
@Async("dataCollectThreadPoolExecutor")
|
||||
public void strategy(String typeName, DataSetParam dataSetParam){
|
||||
log.info("DCStrategy.strategy, typeName:{},dataSetParam:{} ", typeName, null);
|
||||
/* try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}*/
|
||||
if(dcStrategy.containsKey(typeName)){
|
||||
dcStrategy.get(typeName).doService(dataSetParam);
|
||||
}else{
|
||||
log.error("Not found this strategy:{}",typeName );
|
||||
log.error("=====================================");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,233 @@
|
||||
package com.ho.datacollect.config;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.ho.datacollect.util.AnotherMqttConfigUtil;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author wp
|
||||
* @desc: Mqtt配置类
|
||||
* @date 2025/06/11
|
||||
*/
|
||||
@Configuration
|
||||
@Data
|
||||
@ConfigurationProperties("mqtt1")
|
||||
@Slf4j
|
||||
public class AnotherMqttConfig {
|
||||
|
||||
//服务器url
|
||||
@Value("${mqtt1.url}")
|
||||
String url;
|
||||
//超时时间
|
||||
@Value("${mqtt1.timeout}")
|
||||
Integer timeout;
|
||||
//会话保持时间
|
||||
@Value("${mqtt1.keepAlive}")
|
||||
Integer keepAlive;
|
||||
|
||||
@Value("${mqtt1.userName}")
|
||||
String userName;
|
||||
|
||||
@Value("${mqtt1.passWord}")
|
||||
String passWord;
|
||||
|
||||
@Value("${topic.edgeLoginRequest}")
|
||||
String edgeLoginRequest;
|
||||
|
||||
@Value("${topic.edgeReadResponse}")
|
||||
String edgeReadResponse;
|
||||
|
||||
@Value("${topic.edgeWriteResponse}")
|
||||
String edgeWriteResponse;
|
||||
|
||||
@Value("${topic.edgeReportPush}")
|
||||
String edgeReportPush;
|
||||
|
||||
@Value("${topic.edgeControlResponse}")
|
||||
String edgeControlResponse;
|
||||
|
||||
@Autowired
|
||||
AnotherLoginRequestConsumer loginRequestConsumer;
|
||||
|
||||
@Autowired
|
||||
AnotherReadResponseConsumer readResponseConsumer;
|
||||
|
||||
@Autowired
|
||||
AnotherWriteResponseConsumer writeResponseConsumer;
|
||||
|
||||
@Autowired
|
||||
AnotherReportPushConsumer reportPushConsumer;
|
||||
|
||||
@Autowired
|
||||
AnotherControlResponseConsumer controlResponseConsumer;
|
||||
|
||||
//是否自动重连 实际环境要改为true
|
||||
private boolean autoReConnect = true;
|
||||
|
||||
|
||||
//登录验证的监听
|
||||
@Bean(name = "AnotherLoginRequest")
|
||||
public MqttClient initLoginRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(loginRequestConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("LoginRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = AnotherMqttConfigUtil.getLoginRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
return client;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//读取文件请求的监听
|
||||
@Bean(name = "AnotherReadResponse")
|
||||
public MqttClient initReadRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(readResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ReadRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = AnotherMqttConfigUtil.getReadRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//写文件响应的监听
|
||||
@Bean(name = "AnotherWriteResponse")
|
||||
public MqttClient initWriteRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(writeResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("WriteRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = AnotherMqttConfigUtil.getWriteRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//报告上传
|
||||
@Bean(name = "AnotherReportPush")
|
||||
public MqttClient initReportPush() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(reportPushConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ReportPushMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = AnotherMqttConfigUtil.getReportPushTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//远程控制
|
||||
@Bean(name = "AnotherControlResponse")
|
||||
public MqttClient initControlResponse() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(controlResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ControlResponseMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = AnotherMqttConfigUtil.getControlResponseTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,78 @@
|
||||
package com.ho.datacollect.config;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.cache.caffeine.CaffeineCacheManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: Caffeine配置类
|
||||
* @date 2023/4/14
|
||||
*/
|
||||
@Configuration
|
||||
public class CaffeineConfig extends CaffeineCacheManager{
|
||||
|
||||
//10秒钟的缓存
|
||||
@Bean(name = "tenSecondCacheManager")
|
||||
@Primary
|
||||
public CacheManager tenSecondCacheManager(){
|
||||
Caffeine caffeine = Caffeine.newBuilder()
|
||||
.initialCapacity(50) //初始大小
|
||||
.maximumSize(20000) //最大大小
|
||||
.expireAfterWrite(10 ,TimeUnit.SECONDS); //写入/多久过期
|
||||
|
||||
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
|
||||
caffeineCacheManager.setAllowNullValues(true);
|
||||
caffeineCacheManager.setCaffeine(caffeine);
|
||||
return caffeineCacheManager;
|
||||
}
|
||||
|
||||
//30秒钟的缓存
|
||||
@Bean(name = "thirtySecondCacheManager")
|
||||
public CacheManager thirtySecondCacheManager(){
|
||||
Caffeine caffeine = Caffeine.newBuilder()
|
||||
.initialCapacity(50) //初始大小
|
||||
.maximumSize(20000) //最大大小
|
||||
.expireAfterWrite(30 ,TimeUnit.SECONDS); //写入/多久过期
|
||||
|
||||
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
|
||||
caffeineCacheManager.setAllowNullValues(true);
|
||||
caffeineCacheManager.setCaffeine(caffeine);
|
||||
return caffeineCacheManager;
|
||||
}
|
||||
|
||||
//2分钟的缓存
|
||||
@Bean(name = "towMinuteCacheManager")
|
||||
public CacheManager towMinuteCacheManager(){
|
||||
Caffeine caffeine = Caffeine.newBuilder()
|
||||
.initialCapacity(50) //初始大小
|
||||
.maximumSize(20000) //最大大小
|
||||
.expireAfterWrite(2, TimeUnit.MINUTES); //写入/多久过期
|
||||
|
||||
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
|
||||
caffeineCacheManager.setAllowNullValues(true);
|
||||
caffeineCacheManager.setCaffeine(caffeine);
|
||||
return caffeineCacheManager;
|
||||
}
|
||||
|
||||
//5分钟的缓存
|
||||
@Bean(name = "fiveMinuteCacheManager")
|
||||
public CacheManager fiveMinuteCacheManager(){
|
||||
Caffeine caffeine = Caffeine.newBuilder()
|
||||
.initialCapacity(50) //初始大小
|
||||
.maximumSize(20000) //最大大小
|
||||
.expireAfterWrite(2, TimeUnit.MINUTES); //写入/多久过期
|
||||
|
||||
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
|
||||
caffeineCacheManager.setAllowNullValues(true);
|
||||
caffeineCacheManager.setCaffeine(caffeine);
|
||||
return caffeineCacheManager;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,228 @@
|
||||
package com.ho.datacollect.config;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.ho.datacollect.util.MqttConfigUtil;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author yule
|
||||
* @desc: Mqtt配置类
|
||||
* @date 2022/11/7
|
||||
*/
|
||||
@Configuration
|
||||
@Data
|
||||
@ConfigurationProperties("mqtt")
|
||||
@Slf4j
|
||||
public class MqttConfig {
|
||||
|
||||
//服务器url
|
||||
String url;
|
||||
//用户名
|
||||
String userName;
|
||||
//密码
|
||||
String passWord;
|
||||
//超时时间
|
||||
Integer timeout;
|
||||
//会话保持时间
|
||||
Integer keepAlive;
|
||||
|
||||
@Value("${topic.edgeLoginRequest}")
|
||||
String edgeLoginRequest;
|
||||
|
||||
@Value("${topic.edgeReadResponse}")
|
||||
String edgeReadResponse;
|
||||
|
||||
@Value("${topic.edgeWriteResponse}")
|
||||
String edgeWriteResponse;
|
||||
|
||||
@Value("${topic.edgeReportPush}")
|
||||
String edgeReportPush;
|
||||
|
||||
@Value("${topic.edgeControlResponse}")
|
||||
String edgeControlResponse;
|
||||
|
||||
@Autowired
|
||||
LoginRequestConsumer loginRequestConsumer;
|
||||
|
||||
@Autowired
|
||||
ReadResponseConsumer readResponseConsumer;
|
||||
|
||||
@Autowired
|
||||
WriteResponseConsumer writeResponseConsumer;
|
||||
|
||||
@Autowired
|
||||
ReportPushConsumer reportPushConsumer;
|
||||
|
||||
@Autowired
|
||||
ControlResponseConsumer controlResponseConsumer;
|
||||
|
||||
//是否自动重连 实际环境要改为true
|
||||
private boolean autoReConnect = true;
|
||||
|
||||
|
||||
//登录验证的监听
|
||||
@Bean(name = "LoginRequest")
|
||||
public MqttClient initLoginRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(loginRequestConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("LoginRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = MqttConfigUtil.getLoginRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
return client;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//读取文件请求的监听
|
||||
@Bean(name = "ReadResponse")
|
||||
public MqttClient initReadRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(readResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ReadRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = MqttConfigUtil.getReadRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//写文件响应的监听
|
||||
@Bean(name = "WriteResponse")
|
||||
public MqttClient initWriteRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(writeResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("WriteRequestMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = MqttConfigUtil.getWriteRequestTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//报告上传
|
||||
@Bean(name = "ReportPush")
|
||||
public MqttClient initReportPush() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(reportPushConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ReportPushMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = MqttConfigUtil.getReportPushTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//远程控制
|
||||
@Bean(name = "ControlResponse")
|
||||
public MqttClient initControlResponse() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" +clientId);
|
||||
MqttClient client =null;
|
||||
try {
|
||||
client = new MqttClient(url, clientId,null);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(userName);
|
||||
options.setPassword(passWord.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepAlive);
|
||||
options.setExecutorServiceTimeout(0);
|
||||
//options.setAutomaticReconnect(autoReConnect);
|
||||
client.setCallback(controlResponseConsumer);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("ControlResponseMqttClient建立连接:{}", complete);
|
||||
//这里监听的是
|
||||
String[] topic = MqttConfigUtil.getControlResponseTopic();
|
||||
int[] qos = new int[topic.length];
|
||||
client.subscribe(topic,qos);
|
||||
log.info("已订阅topic:{}", topic);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,55 @@
|
||||
package com.ho.datacollect.config;
|
||||
|
||||
import com.alibaba.druid.support.http.StatViewServlet;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.web.servlet.ServletRegistrationBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 线程池配置类
|
||||
* @date 2022/9/12
|
||||
*/
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class ThreadPoolForDataCollectConfig {
|
||||
|
||||
//定义数据采集层线程池
|
||||
@Bean
|
||||
ThreadPoolTaskExecutor dataCollectThreadPoolExecutor(){
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
int core = Runtime.getRuntime().availableProcessors();
|
||||
log.info("core:" +core);
|
||||
executor.setCorePoolSize(core);
|
||||
//最大线程数是核心线程数10倍
|
||||
executor.setMaxPoolSize(core * 10);
|
||||
//
|
||||
executor.setKeepAliveSeconds(600);
|
||||
executor.setQueueCapacity(core * 10);
|
||||
executor.setThreadNamePrefix("td-thread-execute");
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
ServletRegistrationBean regisDruid() {
|
||||
//固定写法,配置访问路径
|
||||
ServletRegistrationBean<StatViewServlet> bean = new ServletRegistrationBean<>(new StatViewServlet(), "/druid/*");
|
||||
//配置登录信息,固定写法
|
||||
HashMap<String, String> initParams = new HashMap<>();
|
||||
//账号和密码的key是固定的
|
||||
initParams.put("loginUsername", "fan");
|
||||
initParams.put("loginPassword", "111111");
|
||||
|
||||
//允许谁可以访问
|
||||
initParams.put("allow", "");
|
||||
bean.setInitParameters(initParams);
|
||||
return bean;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,190 @@
|
||||
package com.ho.datacollect.controller;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.service.StationService;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.component.DCStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 数据采集层Controller
|
||||
* EMS过来的数据都在这作为入口
|
||||
* @date 2022/8/29
|
||||
*/
|
||||
//@RequestMapping("datacollect")
|
||||
//@RestController
|
||||
@Slf4j
|
||||
public class DataCollectController {
|
||||
|
||||
@Autowired
|
||||
DCStrategy dcStrategy;
|
||||
|
||||
|
||||
@Autowired
|
||||
StationService stationService;
|
||||
|
||||
@RequestMapping("index")
|
||||
public void index(@RequestBody JSONObject jsonObject) {
|
||||
log.info("采集数据:{}", jsonObject);
|
||||
//todo 解密 先使用明文,后续传的是加密或压缩的
|
||||
//真实消息
|
||||
//String payloadStr = (String)jsonObject.get("payload");
|
||||
//JSONObject json = JSON.parseObject(payloadStr);
|
||||
//doTag(json);
|
||||
//模拟包
|
||||
//解析json,根据出现的标签分别处理
|
||||
doTag(jsonObject);
|
||||
|
||||
}
|
||||
|
||||
//处理标签
|
||||
private void doTag(JSONObject jsonObject) {
|
||||
//参数列表,用于向下游处理方法传递参数
|
||||
Map<String, Object> params = new LinkedHashMap<>();
|
||||
if (jsonObject.containsKey(DataCollectConstant.FREEZE_TIME)) {
|
||||
String freezeTime = (String) jsonObject.get(DataCollectConstant.FREEZE_TIME);
|
||||
params.put(DataCollectConstant.FREEZE_TIME, freezeTime);
|
||||
}
|
||||
if (jsonObject.containsKey(DataCollectConstant.DATA)) {
|
||||
//得到data
|
||||
LinkedHashMap dataMap = (LinkedHashMap) jsonObject.get(DataCollectConstant.DATA);
|
||||
|
||||
/*if (dataMap.containsKey(DataCollectConstant.STATION)) {
|
||||
//station
|
||||
LinkedHashMap stationMap = (LinkedHashMap) dataMap.get(DataCollectConstant.STATION);
|
||||
//获取到电站id
|
||||
Integer stationId = (Integer) stationMap.get(DataCollectConstant.ID);
|
||||
Station station = stationService.selectById(stationId);
|
||||
params.put(OrgConstant.GROUP_ID, station.getGroupId());
|
||||
params.put(OrgConstant.DEPT_ID, station.getDeptId());
|
||||
params.put(OrgConstant.STATION_ID, stationId);
|
||||
//电站静态数据
|
||||
if (stationMap.containsKey(DataCollectConstant.PARA)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.STATION, (LinkedHashMap) stationMap.get(DataCollectConstant.PARA), params);
|
||||
}
|
||||
//电站状态
|
||||
if (stationMap.containsKey(DataCollectConstant.VARIABLE)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.STATIONYC, (LinkedHashMap) stationMap.get(DataCollectConstant.VARIABLE), params);
|
||||
}
|
||||
//事件
|
||||
if (stationMap.containsKey(DataCollectConstant.EVENT)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.EVENT, (LinkedHashMap) stationMap.get(DataCollectConstant.EVENT), params);
|
||||
}
|
||||
//bay
|
||||
if (stationMap.containsKey(DataCollectConstant.BAY)) {
|
||||
//给bay和device的处理类
|
||||
LinkedHashMap bayMap = (LinkedHashMap) stationMap.get(DataCollectConstant.BAY);
|
||||
//bayId
|
||||
Integer bayId = (Integer) bayMap.get(DataCollectConstant.ID);
|
||||
params.put(DataCollectConstant.BAY, bayId);
|
||||
LinkedHashMap deviceMap = (LinkedHashMap) bayMap.get(DataCollectConstant.DEVICE);
|
||||
if (deviceMap.containsKey(DataCollectConstant.LIST)) {
|
||||
ArrayList deviceList = (ArrayList) deviceMap.get(DataCollectConstant.LIST);
|
||||
for (int i = 0; i < deviceList.size(); i++) {
|
||||
LinkedHashMap device = (LinkedHashMap) deviceList.get(i);
|
||||
//设备Id
|
||||
Integer deviceId = (Integer) device.get(DataCollectConstant.ID);
|
||||
//判断设备类型
|
||||
String deviceType = topologyService.findDeviceType((Integer) params.get(OrgConstant.STATION_ID), deviceId);
|
||||
if (deviceType == null) {
|
||||
log.error("deviceType Not Found !!,{}", deviceType);
|
||||
return;
|
||||
}
|
||||
params.put("srcId", deviceId);
|
||||
//device 设备
|
||||
if(device.containsKey(DataCollectConstant.PARA)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.DEVICE, device, params);
|
||||
}
|
||||
//电芯数据
|
||||
else if (DeviceTypeConstant.CELL.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.BATTERY_CELL, device, params);
|
||||
}
|
||||
//堆
|
||||
else if (DeviceTypeConstant.STACK.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.BATTERY_STACK, device, params);
|
||||
}
|
||||
//簇
|
||||
else if (DeviceTypeConstant.CLUSTER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.BATTERY_CLUSTER, device, params);
|
||||
}
|
||||
//舱
|
||||
else if (DeviceTypeConstant.WAREHOUSE.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.WAREHOUSE, device, params);
|
||||
}
|
||||
//新能源设备-光伏逆变器
|
||||
else if (DeviceTypeConstant.PVINVERTER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.PVINVERTER, device, params);
|
||||
}
|
||||
//气象
|
||||
else if (DeviceTypeConstant.METEOROLOGICAL.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.METE, device, params);
|
||||
}
|
||||
//通用电表
|
||||
else if (DeviceTypeConstant.UNIVERSAL_METER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.UNIVERSAL_METER, device, params);
|
||||
}
|
||||
//并网统计电表
|
||||
else if (DeviceTypeConstant.GRID_METER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.GRID_METER, device, params);
|
||||
}
|
||||
//厂(站)用电电表
|
||||
else if (DeviceTypeConstant.STATION_METER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.STATION_METER, device, params);
|
||||
}
|
||||
//单项电表
|
||||
else if (DeviceTypeConstant.SINGLE_HASE.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.SINGLE_HASE, device, params);
|
||||
}
|
||||
//充电桩
|
||||
else if (deviceType.contains(DeviceTypeConstant.CHARGE_PILE)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.CHARGE_PILE, device, params);
|
||||
}
|
||||
//充电枪
|
||||
else if (deviceType.contains(DeviceTypeConstant.CHARGE_GUN)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.CHARGE_GUN, device, params);
|
||||
}
|
||||
//电能质量
|
||||
else if (DeviceTypeConstant.ELEC_QUALITY.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.ELEC_QUALITY, device, params);
|
||||
}
|
||||
//充电桩电表
|
||||
else if (DeviceTypeConstant.CHARGE_PILE_METER.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.CHARGE_PILE_METER, device, params);
|
||||
}
|
||||
//耗能表
|
||||
else if (DeviceTypeConstant.ENERGY_CONSUMPTION.equals(deviceType)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.ENERGY_CONSUMPTION, device, params);
|
||||
}
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
System.out.println();
|
||||
|
||||
}
|
||||
//拓扑 todo
|
||||
if (stationMap.containsKey(DataCollectConstant.TOPOLOGY)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.TOPOLOGY, stationMap, params);
|
||||
}
|
||||
//电价
|
||||
if (stationMap.containsKey(DataCollectConstant.PRICE)) {
|
||||
dcStrategy.strategy(DataCollectConstant.KEY_MAP.PRICE, stationMap, params);
|
||||
}
|
||||
//数据模版
|
||||
if (stationMap.containsKey(DataCollectConstant.DATA_TEMPLATE)) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package com.ho.datacollect.feignclient;
|
||||
|
||||
import com.ho.common.tools.constant.ContextConstant;
|
||||
import com.ho.flow.vo.Event;
|
||||
import com.ho.flow.vo.req.event.EventQueryReqVO;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
|
||||
/**
|
||||
* @Description Flow远程调用
|
||||
* Author xueweizhi
|
||||
* Date 2023/3/07 10:49
|
||||
*/
|
||||
@FeignClient(value = ContextConstant.FLOW_CENTER,fallback = DataCollectFlowFeignClientFallback.class,contextId = "DataCollectFlowFeignClient")
|
||||
public interface DataCollectFlowFeignClient {
|
||||
|
||||
@PostMapping(value = ContextConstant.ROOT_CONTEXT +ContextConstant.FLOW_CONTEXT + "outerApi/selectEventByParams")
|
||||
Event selectEventByParams(@RequestBody EventQueryReqVO eventQueryReqVO);
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package com.ho.datacollect.feignclient;
|
||||
|
||||
import com.ho.common.tools.exception.BaseResponseCode;
|
||||
import com.ho.common.tools.exception.BusinessException;
|
||||
import com.ho.flow.vo.Event;
|
||||
import com.ho.flow.vo.req.event.EventQueryReqVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Description 调用flow-center失败
|
||||
* Author xueweizhi
|
||||
* Date 2023/3/07 10:49
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DataCollectFlowFeignClientFallback implements DataCollectFlowFeignClient {
|
||||
|
||||
@Override
|
||||
public Event selectEventByParams(EventQueryReqVO eventQueryReqVO) {
|
||||
log.error("调用 [FlowFeignClient.selectEventByParams] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,69 @@
|
||||
package com.ho.datacollect.feignclient;
|
||||
|
||||
import com.ho.common.tools.constant.ContextConstant;
|
||||
import com.ho.common.tools.exception.DataResult;
|
||||
import com.ho.datacollect.api.vo.req.add.*;
|
||||
import com.ho.datacollect.api.vo.req.energyConsumption.TdEnergyConsumptionReq;
|
||||
import com.ho.datacollect.api.vo.req.inverter.InverterAddReq;
|
||||
import com.ho.td.api.entity.query.QueryTableVO;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @描述 调用td-service
|
||||
* @创建时间 2021/8/9
|
||||
* @修改人
|
||||
*/
|
||||
@FeignClient(value = ContextConstant.TD_SERVICE,contextId = "DataCollectTdFeignClient")
|
||||
public interface DataCollectTdFeignClient {
|
||||
|
||||
/**
|
||||
* 新增一条一体机柜数据
|
||||
* @param device
|
||||
* @return
|
||||
*/
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "device001/insertDeviceSimple")
|
||||
DataResult insertDeviceSimple(@RequestBody TdDeviceAddReq device);
|
||||
|
||||
/**
|
||||
* 新增一条一体机柜数据
|
||||
* @param device
|
||||
* @return
|
||||
*/
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "device001/insertDeviceComplex")
|
||||
DataResult insertDeviceComplex(@RequestBody TdDeviceAddReq device);
|
||||
|
||||
//新增一条新能源光伏数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "PVEnergy/insertEnergyPVOne")
|
||||
DataResult insertEnergyPV(@RequestBody TdDevicePVEnergyReq tdAddReq);
|
||||
|
||||
//新增一条能耗表数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "unMeter/insertUnMeterOne")
|
||||
DataResult insertUnMeterOne(@RequestBody TdMeterAddReq tdAddReq);
|
||||
|
||||
//新增一条并网点统计用电表数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "GridMeter/insertGridMeterOne")
|
||||
DataResult insertGridMeterOne(@RequestBody TdMeterAddReq tdAddReq);
|
||||
|
||||
//新增一条厂(站)用电表数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "StationMeter/insertStationMeterOne")
|
||||
DataResult insertStationMeterOne(@RequestBody TdMeterAddReq tdAddReq);
|
||||
|
||||
//新增一条能耗表数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "energyConsumption/insertEnergyConsumptionOne")
|
||||
DataResult insertEnergyConsumptionOne(@RequestBody TdEnergyConsumptionReq tdAddReq);
|
||||
|
||||
//新增一条逆变器
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "inverter/insertInverterOne")
|
||||
DataResult insertInverterOne(@RequestBody InverterAddReq tdAddReq);
|
||||
|
||||
//新增一条通用电表数据
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "unMeter/insertUniversalMeterOne")
|
||||
DataResult insertUniversalMeterOne(@RequestBody TdUniversalMeterReq tdUniversalMeterReq);
|
||||
|
||||
@PostMapping(value = ContextConstant.TD_CONTEXT + "unMeter/insertWeatherStationOne")
|
||||
DataResult insertWeatherStationOne(@RequestBody TdWeatherStationReq tdWeatherStationReq);
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
package com.ho.datacollect.feignclient;
|
||||
|
||||
import com.ho.common.tools.exception.BaseResponseCode;
|
||||
import com.ho.common.tools.exception.BusinessException;
|
||||
import com.ho.common.tools.exception.DataResult;
|
||||
import com.ho.datacollect.api.vo.req.add.*;
|
||||
import com.ho.datacollect.api.vo.req.energyConsumption.TdEnergyConsumptionReq;
|
||||
import com.ho.datacollect.api.vo.req.inverter.InverterAddReq;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @描述 UserClient失败回调
|
||||
* @创建时间 2021/8/9
|
||||
* @修改人
|
||||
*/
|
||||
@Component
|
||||
public class DataCollectTdFeignClientFallback implements DataCollectTdFeignClient {
|
||||
|
||||
Logger log = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Override
|
||||
public DataResult insertDeviceSimple(TdDeviceAddReq device) {
|
||||
log.error("调用 [TdClient.insertDeviceSimple] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertDeviceComplex(TdDeviceAddReq device) {
|
||||
log.error("调用 [TdClient.insertDeviceComplex] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertEnergyPV(TdDevicePVEnergyReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertEnergyPV] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertUnMeterOne(TdMeterAddReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertUnMeterOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertGridMeterOne(TdMeterAddReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertGridMeterOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertStationMeterOne(TdMeterAddReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertStationMeterOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertEnergyConsumptionOne(TdEnergyConsumptionReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertEnergyConsumptionOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertInverterOne(InverterAddReq tdAddReq) {
|
||||
log.error("调用 [TdClient.insertInverterOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertUniversalMeterOne(TdUniversalMeterReq tdUniversalMeterReq) {
|
||||
log.error("调用 [TdClient.insertUniversalMeterOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataResult insertWeatherStationOne(TdWeatherStationReq tdUniversalMeterReq) {
|
||||
log.error("调用 [TdClient.insertWeatherStationOne] 异常!");
|
||||
throw new BusinessException(BaseResponseCode.FEIGN_CALL_FAIL);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package com.ho.datacollect.service;
|
||||
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 数据采集
|
||||
* @date 2022/8/29
|
||||
*/
|
||||
public interface DataListCollectService {
|
||||
/**
|
||||
*
|
||||
* @param dataSetParam
|
||||
*/
|
||||
void doService(List<DataSetParam> dataSetParam);
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package com.ho.datacollect.service;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.entity.ModelDeviceColComp;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 模型转换服务类
|
||||
* 模型 和 设备模型之间的转换
|
||||
* @date 2023/2/11
|
||||
*/
|
||||
public interface ModelConvertService {
|
||||
|
||||
//根据字段配置得到入库td前的对象
|
||||
JSONObject getInputDataByDeviceType(JSONObject dataSet , List<ModelDeviceColComp> compList ,String deviceType);
|
||||
|
||||
JSONObject getInputDataByDeviceTypeForDevice001(Date updateTime, JSONObject dataSet , List<ModelDeviceColComp> compList , String deviceType);
|
||||
|
||||
//设备模型映射为平台模型实体
|
||||
JSONObject convertModelFromDevice(JSONObject dataSet ,List<ModelDeviceColComp> compList,String deviceType);
|
||||
}
|
||||
@ -0,0 +1,233 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.constant.DeviceTypeConstant;
|
||||
import com.ho.business.entity.DeviceTypeCol;
|
||||
import com.ho.business.entity.ModelDeviceColComp;
|
||||
import com.ho.business.service.ModelDeviceService;
|
||||
import com.ho.business.vo.DeviceTransfer;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.api.vo.req.add.TdDeviceAddReq;
|
||||
import com.ho.datacollect.feignclient.DataCollectTdFeignClient;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import com.ho.datacollect.service.ModelConvertService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Description 一体机柜
|
||||
* Author yule
|
||||
* Date 2023/3/9 14:05
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.DEVICE001)
|
||||
@Slf4j
|
||||
public class Device001DataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
DataCollectTdFeignClient tdFeignClient;
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
ModelDeviceService modelDeviceService;
|
||||
|
||||
@Autowired
|
||||
ModelConvertService modelConvertService;
|
||||
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
String deviceType = null;
|
||||
String modelName = null;
|
||||
try {
|
||||
//
|
||||
//String str = "{\"dataSet\":{\"YC0030\":0,\"YC0015\":0,\"YC0016\":0,\"YC0013\":21,\"YC0035\":65535,\"YC0014\":0,\"YC0036\":65535,\"YC0011\":0,\"YC0033\":65535,\"YC0012\":26,\"YC0034\":65535,\"YC0031\":1,\"YC0010\":0,\"YC0032\":0,\"YC0019\":25,\"YC0017\":0,\"YC0018\":0,\"YC0004\":0,\"YC0026\":8,\"YC0005\":5,\"YC0027\":0,\"YC0002\":5,\"YC0024\":220,\"YC0003\":45,\"YC0025\":624,\"YC0022\":75,\"YC0001\":23,\"YC0023\":1,\"YC0020\":30,\"YC0021\":80,\"YC0008\":0,\"YC0009\":0,\"YC0006\":10,\"YC0028\":0,\"YC0007\":1,\"YC0029\":0},\"deviceType\":\"11#air_condition_507\",\"deviceTypeId\":1522,\"freezeTime\":\"2024-01-22 15:53:00.216\",\"groupId\":155,\"modelName\":\"device_simple\",\"srcId\":432000015,\"stationId\":507}";
|
||||
//log.info("Device001DataCollectServiceImpl: dataSetParam:{}", JSONObject.toJSONString(dataSetParam));
|
||||
//dataSetParam = JSONObject.parseObject(str, DataSetParam.class);
|
||||
//拼接表名
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
Integer srcId = dataSetParam.getSrcId();
|
||||
//设备类型
|
||||
deviceType = dataSetParam.getDeviceType();
|
||||
modelName = dataSetParam.getModelName();
|
||||
//时间送过来是个字符串,要转成Date类型
|
||||
String freezeTime = dataSetParam.getFreezeTime();
|
||||
//捕获异常,如果时间转换出错,就使用现在的时间
|
||||
DateTime ts = new DateTime();
|
||||
Date createTime = new Date();
|
||||
Date updateTime = Convert.toDate(freezeTime);
|
||||
try {
|
||||
ts = DateUtil.parse(freezeTime, CommonConstant.DATE_YMD_HMSS);
|
||||
} catch (Exception e) {
|
||||
log.error("匹配入库时freezeTime格式转换时出错,电站:{},设备类型:{},freezeTime:{},", stationId, deviceType, freezeTime);
|
||||
return;
|
||||
}
|
||||
String tableName = new StringBuilder().append(groupId).append(DataCollectConstant.TABLE_LINK)
|
||||
.append(stationId).append(DataCollectConstant.TABLE_LINK).append(srcId).toString();
|
||||
//创建新增对象
|
||||
TdDeviceAddReq device = new TdDeviceAddReq();
|
||||
//共通属性
|
||||
device.setGroupId(groupId);
|
||||
device.setStationId(stationId);
|
||||
device.setSrcId(srcId);
|
||||
device.setTableName(tableName);
|
||||
//inverterAddReq.setDeviceType(deviceType);
|
||||
JSONObject dataSet = dataSetParam.getDataSet();
|
||||
log.info("Device001DataCollectServiceImpl 产生的tableName: {}, freezeTime:{} , deviceType: {},modelName:{}", tableName, freezeTime, deviceType, modelName);
|
||||
//从设备定义表中得到需要哪些数据,不是所有数据都要入库
|
||||
//需要入库的字段是根据外部设备模型查出来的字段集合
|
||||
//dataSet赋值6个
|
||||
if (dataSet == null) {
|
||||
dataSet = new JSONObject();
|
||||
}
|
||||
/*List<ModelDeviceColComp> compList = modelDeviceService.getCompListByType(CommonConstant.ModelDeviceType.DEVICE, deviceType,null);
|
||||
if (compList.isEmpty()) {
|
||||
log.error("该设备未配置映射字段");
|
||||
return;
|
||||
}*/
|
||||
//入库的数据是设备原始格式,但需要经过一轮匹配,匹配到才入库不是每个字段都入库
|
||||
//JSONObject inputData = modelConvertService.getInputDataByDeviceType(dataSet, compList, deviceType);
|
||||
//先判断缓存中是否存在
|
||||
String redisKey = deviceType + ":" + stationId + ":" + srcId;
|
||||
Map<Object, Object> hgetall = redisService.hgetall(redisKey);
|
||||
JSONObject modifyDataObject = new JSONObject();
|
||||
//2024-01-22 根据点表配置的系数和偏移量, 先乘系数 再减去偏移量
|
||||
List<DeviceTypeCol> deviceTypeCols = modelDeviceService.selectByDeviceType(deviceType);
|
||||
//根据col分组
|
||||
Map<String, List<DeviceTypeCol>> colMap = deviceTypeCols.stream().collect(Collectors.groupingBy(DeviceTypeCol::getCol));
|
||||
JSONObject finalDataSet = dataSet;
|
||||
dataSet.forEach((k, v) -> {
|
||||
//todo 更改一体机柜缓存值类型
|
||||
DeviceTransfer deviceTransfer = (DeviceTransfer) hgetall.get(k);
|
||||
if (null == deviceTransfer) {
|
||||
deviceTransfer = new DeviceTransfer();
|
||||
deviceTransfer.setIsRecovery(null);
|
||||
deviceTransfer.setEventId(null);
|
||||
}
|
||||
//如果系数不为空,偏移量不为空,则需要处理
|
||||
if (colMap.containsKey(k)) {
|
||||
List<DeviceTypeCol> deviceTypeColsV = colMap.get(k);
|
||||
if (deviceTypeColsV != null && deviceTypeColsV.size() >= 1) {
|
||||
DeviceTypeCol deviceTypeCol = deviceTypeColsV.get(0);
|
||||
BigDecimal primitive = new BigDecimal(String.valueOf(v));
|
||||
deviceTransfer.setSrcValue(primitive);
|
||||
if (deviceTypeCol.getFactor() != null) {
|
||||
log.info("系数factor:" + deviceTypeCol.getFactor());
|
||||
primitive = primitive.multiply(deviceTypeCol.getFactor());
|
||||
}
|
||||
if (deviceTypeCol.getOffsetValue() != null) {
|
||||
log.info("系数偏移量 OffsetValue:" + deviceTypeCol.getOffsetValue());
|
||||
primitive = primitive.subtract(deviceTypeCol.getOffsetValue());
|
||||
}
|
||||
if(CommonConstant.ONE.equals(deviceTypeCol.getFilterType())&&deviceTypeCol.getMaxValue()!=null&&deviceTypeCol.getMinValue()!=null){
|
||||
if(primitive.compareTo(deviceTypeCol.getMaxValue())>0 || primitive.compareTo(deviceTypeCol.getMinValue())<0){
|
||||
log.info("数据不符合设定的最大最小值要求,不更新redis缓存,redis的key:{},col:{}", redisKey, k);
|
||||
return;
|
||||
}
|
||||
}
|
||||
deviceTransfer.setValue(primitive);
|
||||
if(deviceTransfer.getUpdateTime()==null){
|
||||
deviceTransfer.setUpdateTime(updateTime);
|
||||
modifyDataObject.put(k, deviceTransfer);
|
||||
}else{
|
||||
if(deviceTransfer.getUpdateTime().getTime()<=updateTime.getTime()){
|
||||
deviceTransfer.setUpdateTime(updateTime);
|
||||
modifyDataObject.put(k, deviceTransfer);
|
||||
}
|
||||
}
|
||||
//把dataSet中的值也要根据
|
||||
finalDataSet.put(k ,primitive);
|
||||
}
|
||||
}
|
||||
//已经配置的字段并且值非空
|
||||
// Map<String,Object> map = new HashMap<>();
|
||||
// map.put(k,v);
|
||||
// map.put(DataCollectConstant.UPDATE_TIME, updateTime);
|
||||
// modifyDataObject.put(k,map);
|
||||
});
|
||||
//入时序库的对象元素是JsonObject
|
||||
JSONObject inputData = finalDataSet;
|
||||
if (inputData.isEmpty()) {
|
||||
log.error("转换后无匹配的数据字段 ,无需入库");
|
||||
return;
|
||||
}
|
||||
//时序数据对象, 把两个时间字段放进去
|
||||
inputData.put(DataCollectConstant.TS, ts);
|
||||
inputData.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//将设备数据格式转换为平台数据格式(注意这个是平台数据格式)
|
||||
//JSONObject modelObject = modelConvertService.convertModelFromDevice(dataSet, compList, deviceType);
|
||||
//内部外部使用相同模型
|
||||
JSONObject modelObject = modifyDataObject;
|
||||
//addCommunicationStatus(dataSet, modelObject);
|
||||
String oldTs = null;
|
||||
try{
|
||||
oldTs = (String)hgetall.get(DataCollectConstant.TS);
|
||||
}catch (ClassCastException e){
|
||||
modelObject.put(DataCollectConstant.TS, DateUtil.format(ts,CommonConstant.DATE_YMD_HMSS));
|
||||
modelObject.put(DataCollectConstant.UPDATE_TIME, createTime);
|
||||
log.info("redis中ts是老的时间类型,不做适配处理,继续更新缓存中的ts以及updateTime,电站:{},设备类型:{},freezeTime:{},", stationId, deviceType, freezeTime);
|
||||
}
|
||||
if(oldTs!=null){
|
||||
DateTime oldDate = DateUtil.parse(oldTs, CommonConstant.DATE_YMD_HMSS);
|
||||
if(oldDate.getTime()<=ts.getTime()){
|
||||
modelObject.put(DataCollectConstant.TS, DateUtil.format(ts,CommonConstant.DATE_YMD_HMSS));
|
||||
//最后更新时间
|
||||
modelObject.put(DataCollectConstant.UPDATE_TIME, createTime);
|
||||
}else{
|
||||
log.info("redis中ts是时间比上送的ts更靠后,不更新redis中ts时间,电站:{},设备类型:{},freezeTime:{},", stationId, deviceType, freezeTime);
|
||||
}
|
||||
}
|
||||
//先判断缓存中是否存在
|
||||
// String redisKey = deviceType + ":" + stationId + ":" + srcId;
|
||||
Boolean flag = redisService.hasKey(redisKey);
|
||||
//缓存的数据是转换后的平台模型
|
||||
if (!flag) {
|
||||
//没数据新建一个createTime
|
||||
modelObject.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//设备类型塞进去
|
||||
//modelObject.put(DataCollectConstant.DEVICE_TYPE, deviceType);
|
||||
}
|
||||
try{
|
||||
//将设备映射字段同时写到redis,它的值和原始字段一致
|
||||
List<ModelDeviceColComp> compList = modelDeviceService.getCompListByType(CommonConstant.ModelDeviceType.DEVICE, deviceType, null);
|
||||
JSONObject convertInputData = modelConvertService.getInputDataByDeviceTypeForDevice001(updateTime, modifyDataObject, compList, deviceType);
|
||||
log.info("映射的字段 convertInputData:" + convertInputData);
|
||||
if (!convertInputData.isEmpty()) {
|
||||
modelObject.putAll(convertInputData);
|
||||
}
|
||||
}catch (Exception e){
|
||||
log.error("查询映射失败,请检查:{}", deviceType);
|
||||
}
|
||||
//log.info("modelObject: {}", modelObject);
|
||||
if(modelObject.size()>0){
|
||||
redisService.hmset(redisKey, modelObject);
|
||||
}
|
||||
//log.info("inputData:{},", inputData);
|
||||
device.setDevice(inputData);
|
||||
//指定超级表
|
||||
if (DeviceTypeConstant.DEVICE_COMPLEX.equals(modelName)) {
|
||||
tdFeignClient.insertDeviceComplex(device);
|
||||
} else if (DeviceTypeConstant.DEVICE_SIMPLE.equals(modelName)) {
|
||||
tdFeignClient.insertDeviceSimple(device);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
log.error("添加:{} 失败: {}", deviceType, e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,136 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import com.ho.business.constant.DeviceTypeConstant;
|
||||
import com.ho.business.entity.Device;
|
||||
import com.ho.business.entity.DeviceCall;
|
||||
import com.ho.business.entity.Topology;
|
||||
import com.ho.business.service.DeviceCallService;
|
||||
import com.ho.business.service.DeviceService;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.constant.RedisKeyConstant;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author gyan
|
||||
* @desc: 设备入库
|
||||
* @DateTime: 2022/10/13 15:25
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.DEVICE)
|
||||
@Slf4j
|
||||
public class DeviceDataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
DeviceService deviceService;
|
||||
|
||||
@Autowired
|
||||
DeviceCallService deviceCallService;
|
||||
|
||||
@Autowired
|
||||
RedissonClient redissonClient;
|
||||
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
log.info("DeviceDataCollectServiceImpl===============:{}", dataSetParam);
|
||||
RLock lock = null;
|
||||
try {
|
||||
if (dataSetParam == null) {
|
||||
return;
|
||||
}
|
||||
//设备数据来自于 topology,所以类型也是Topology
|
||||
List<Topology> topologyList = dataSetParam.getList();
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
if (topologyList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
lock = redissonClient.getLock(RedisKeyConstant.LOCK_KEY_DEVICE);
|
||||
//获取锁,没有获取到锁的阻塞在lock.lock() 这行
|
||||
lock.lock();
|
||||
String sn = dataSetParam.getSerialNo();
|
||||
Integer pid = 0;
|
||||
//拓扑转成device
|
||||
for (Topology topology : topologyList) {
|
||||
//根据stationId和设备id查,如果有数据就忽略,否则新增
|
||||
log.info("DeviceDataCollectServiceImpl===============:{},{}", stationId, topology.getId());
|
||||
DeviceCall deviceCall = deviceCallService.selectByStationIdAndSrcId(stationId, topology.getId());
|
||||
if (deviceCall != null) {
|
||||
continue;
|
||||
}
|
||||
log.info("DeviceDataCollectServiceImpl===============deviceCall:{}", deviceCall);
|
||||
//此时device 还未初始化,所以先初始化
|
||||
deviceCall = new DeviceCall();
|
||||
//新增,先查设备类型
|
||||
//根据类型三要素查typeContract查 deviceTypeId ,然后再去device_type查 类型
|
||||
//修改为手工维护,并与devic_type_config表相关,此处拓扑表舍弃
|
||||
// Topology topologyQuery = new Topology();
|
||||
// topologyQuery.setCategory(topology.getCategory());
|
||||
// topologyQuery.setType(topology.getType());
|
||||
// topologyQuery.setSubType(topology.getSubType());
|
||||
// TypeContrast typeContract = typeContrastService.getTypeContract(topologyQuery);
|
||||
// //继续查deviceType
|
||||
// if (typeContract != null) {
|
||||
// DeviceType one = deviceTypeService.getOne(typeContract.getDeviceTypeId());
|
||||
// deviceCall.setDeviceType(one.getDeviceType());
|
||||
// }
|
||||
//父节点为0的时候代表电站
|
||||
if(CommonConstant.ZERO.equals(topology.getPid())){
|
||||
pid = topology.getId();
|
||||
}
|
||||
deviceCall.setSerialNo(sn);
|
||||
deviceCall.setGroupId(groupId);
|
||||
deviceCall.setStationId(stationId);
|
||||
deviceCall.setSrcId(topology.getId());
|
||||
deviceCall.setDeviceName(topology.getName());
|
||||
deviceCall.setPid(topology.getPid());
|
||||
deviceCall.setCategory(topology.getCategory());
|
||||
deviceCall.setCreateTime(new Date());
|
||||
//status 先传3
|
||||
deviceCall.setStatus(3);
|
||||
deviceCallService.insertSelective(deviceCall);
|
||||
Device device = new Device();
|
||||
BeanUtils.copyProperties(deviceCall,device);
|
||||
deviceService.insertSelective(device);
|
||||
}
|
||||
//将站点也作为一个设备存储到表里面
|
||||
DeviceCall deviceCall = deviceCallService.selectByStationIdAndSrcId(stationId, CommonConstant.ZERO);
|
||||
if (deviceCall == null) {
|
||||
deviceCall = new DeviceCall();
|
||||
deviceCall.setSerialNo(sn);
|
||||
deviceCall.setGroupId(groupId);
|
||||
deviceCall.setStationId(stationId);
|
||||
deviceCall.setSrcId(CommonConstant.ZERO);
|
||||
deviceCall.setDeviceName(CommonConstant.ACCESSPOINT);
|
||||
deviceCall.setDeviceType(DeviceTypeConstant.ACCESSPOINT);
|
||||
deviceCall.setPid(pid);
|
||||
deviceCall.setCreateTime(new Date());
|
||||
//status 先传3
|
||||
deviceCall.setStatus(3);
|
||||
log.info("准备添加接入站点进入设备表");
|
||||
deviceCallService.insertSelective(deviceCall);
|
||||
Device device = new Device();
|
||||
BeanUtils.copyProperties(deviceCall,device);
|
||||
deviceService.insertSelective(device);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("添加设备数据失败:{}",e.getMessage());
|
||||
}finally {
|
||||
if (lock != null && lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import com.ho.business.service.ElecPriceService;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.feignclient.DataCollectTdFeignClient;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author GuYan
|
||||
* @version 1.0.0
|
||||
* @ClassName ElecPriceCellDataConllectServiseImpl.java
|
||||
* @Description 电价数据入库
|
||||
* @createTime 2022年09月09日 11:04:00
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.PRICE)
|
||||
@Slf4j
|
||||
public class ElecPriceCellDataConllectServiseImpl implements DataCollectService {
|
||||
@Autowired
|
||||
ElecPriceService elecPriceService;
|
||||
|
||||
|
||||
@Autowired
|
||||
DataCollectTdFeignClient tdFeignClient;
|
||||
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
/*try {
|
||||
if (linkedHashMap == null || linkedHashMap.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
//取电站id
|
||||
Integer stationId = (Integer) params.get(OrgConstant.STATION_ID);
|
||||
//取电价price
|
||||
JSONArray jsonArray = (JSONArray) linkedHashMap.get(DataCollectConstant.LIST);
|
||||
List<ElecPrice> elecPrices = jsonArray.toJavaList(ElecPrice.class);
|
||||
if (!elecPrices.isEmpty()) {
|
||||
for (ElecPrice elecPrice : elecPrices) {
|
||||
elecPrice.setStationId(stationId);
|
||||
elecPriceService.insertSelective(elecPrice);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("添加电价数据失败");
|
||||
}*/
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,117 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.lang.Snowflake;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.common.BusiTool;
|
||||
import com.ho.business.feignclient.FlowFeignClient;
|
||||
import com.ho.business.service.StationService;
|
||||
import com.ho.business.service.TopologyService;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import com.ho.flow.vo.req.event.EventAddReq;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 事件采集处理类
|
||||
* @date 2022/8/31
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.EVENT)
|
||||
@Slf4j
|
||||
public class EventDataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
FlowFeignClient flowFeignClient;
|
||||
|
||||
@Autowired
|
||||
StationService stationService;
|
||||
|
||||
@Autowired
|
||||
TopologyService topologyService;
|
||||
|
||||
@Autowired
|
||||
BusiTool busiTool;
|
||||
|
||||
@Autowired
|
||||
Snowflake snowflake;
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
//发送到告警中心匹配处理
|
||||
//异步调用方法,并指定 数据采集模块的线程池
|
||||
//@Async("dataCollectThreadPoolExecutor")
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
log.info("EventDataCollectServiceImpl===============");
|
||||
try {
|
||||
if (dataSetParam == null ) {
|
||||
return;
|
||||
}
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
Integer srcId = dataSetParam.getSrcId();
|
||||
String freezeTime = dataSetParam.getFreezeTime();
|
||||
String deviceType = dataSetParam.getDeviceType();
|
||||
Integer deviceTypeId =dataSetParam.getDeviceTypeId();
|
||||
//捕获异常,如果时间转换出错,就使用现在的时间
|
||||
DateTime ts = new DateTime();
|
||||
try {
|
||||
ts = DateUtil.parse(freezeTime, CommonConstant.DATE_YMD_HMSS);
|
||||
} catch (Exception e) {
|
||||
log.error("freezeTime 格式转换时出错");
|
||||
}
|
||||
JSONObject dataSet = dataSetParam.getDataSet();
|
||||
//一次一个对象,但这个对象有多个属性
|
||||
EventAddReq eventAddReq = new EventAddReq();
|
||||
eventAddReq.setTs(ts);
|
||||
eventAddReq.setGroupId(groupId);
|
||||
eventAddReq.setStationId(stationId);
|
||||
eventAddReq.setSrcId(srcId);
|
||||
eventAddReq.setDeviceType(deviceType);
|
||||
eventAddReq.setDeviceName(dataSetParam.getDeviceName());
|
||||
eventAddReq.setDeviceTypeId(deviceTypeId);
|
||||
eventAddReq.setDataSet(dataSet);
|
||||
//发送,后续可以改为异步 或者 消息队列实现
|
||||
flowFeignClient.addEvents(eventAddReq);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("添加告警事件数据失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
//转工单的接口, 先不调用
|
||||
/* private void generateWorkOrder(Event e, List<WorkOrder> workOrders, List<WorkOrderCirculation> workOrderCirculations) {
|
||||
//工单对象赋值
|
||||
WorkOrder workOrder = new WorkOrder();
|
||||
//生成工单号
|
||||
String workOrderId = busiTool.getNextWorkOrderId(WorkOrderConstant.IdTypeWorkOrder);
|
||||
workOrder.setId(snowflake.nextId());
|
||||
workOrder.setOrderId(workOrderId);
|
||||
workOrder.setStationId(e.getStationId());
|
||||
workOrder.setGroupId(e.getGroupId());
|
||||
workOrder.setDeptId(e.getDeptId());
|
||||
workOrder.setDesc(e.getDescription());
|
||||
workOrder.setEventId(e.getId());
|
||||
workOrder.setEventLevel(e.getEventLevel());
|
||||
workOrder.setCreateTime(e.getCreateTime());
|
||||
workOrder.setFromId(WorkOrderConstant.autoTransferOrderId);
|
||||
workOrder.setWorkOrderStatus(WorkOrderConstant.WorkOrderStatus.toBeAllocate);
|
||||
workOrder.setWorkOrderType(WorkOrderConstant.WorkOrderType.autoRelate);
|
||||
workOrders.add(workOrder);
|
||||
//工单流转对象赋值
|
||||
WorkOrderCirculation workOrderCirculation = new WorkOrderCirculation();
|
||||
BeanUtils.copyProperties(workOrder, workOrderCirculation, "id");
|
||||
workOrderCirculation.setId(snowflake.nextId());
|
||||
workOrderCirculation.setWorkOrderAction(WorkOrderConstant.WorkOrderAction.workOrderCreate);
|
||||
workOrderCirculations.add(workOrderCirculation);
|
||||
}*/
|
||||
|
||||
}
|
||||
@ -0,0 +1,101 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.lang.Snowflake;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.common.BusiTool;
|
||||
import com.ho.business.feignclient.FlowFeignClient;
|
||||
import com.ho.business.service.StationService;
|
||||
import com.ho.business.service.TopologyService;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import com.ho.flow.vo.req.event.EventAddReq;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 事件采集处理类
|
||||
* @date 2022/8/31
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.EVENTS)
|
||||
@Slf4j
|
||||
public class EventDataListCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
FlowFeignClient flowFeignClient;
|
||||
|
||||
@Autowired
|
||||
StationService stationService;
|
||||
|
||||
@Autowired
|
||||
TopologyService topologyService;
|
||||
|
||||
@Autowired
|
||||
BusiTool busiTool;
|
||||
|
||||
@Autowired
|
||||
Snowflake snowflake;
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
//发送到告警中心匹配处理
|
||||
//异步调用方法,并指定 数据采集模块的线程池
|
||||
@Async("dataCollectThreadPoolExecutor")
|
||||
@Override
|
||||
public void doService(DataSetParam dataParam) {
|
||||
log.info("EventDataListCollectServiceImpl===============");
|
||||
try {
|
||||
if (dataParam == null ) {
|
||||
return;
|
||||
}
|
||||
List<DataSetParam> dataSetParamList = dataParam.getDataSetParamList();
|
||||
List<EventAddReq> eventAddReqList = new ArrayList<>();
|
||||
for (DataSetParam dataSetParam:dataSetParamList) {
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
Integer srcId = dataSetParam.getSrcId();
|
||||
String freezeTime = dataSetParam.getFreezeTime();
|
||||
String deviceType = dataSetParam.getDeviceType();
|
||||
Integer deviceTypeId =dataSetParam.getDeviceTypeId();
|
||||
//捕获异常,如果时间转换出错,就使用现在的时间
|
||||
DateTime ts = new DateTime();
|
||||
try {
|
||||
ts = DateUtil.parse(freezeTime, CommonConstant.DATE_YMD_HMSS);
|
||||
} catch (Exception e) {
|
||||
log.error("匹配告警freezeTime格式转换时出错,电站:{},设备类型:{},freezeTime:{}", stationId, deviceType, freezeTime);
|
||||
return;
|
||||
}
|
||||
JSONObject dataSet = dataSetParam.getDataSet();
|
||||
//一次一个对象,但这个对象有多个属性
|
||||
EventAddReq eventAddReq = new EventAddReq();
|
||||
eventAddReq.setTs(ts);
|
||||
eventAddReq.setGroupId(groupId);
|
||||
eventAddReq.setStationId(stationId);
|
||||
eventAddReq.setSrcId(srcId);
|
||||
eventAddReq.setDeviceType(deviceType);
|
||||
eventAddReq.setDeviceTypeId(deviceTypeId);
|
||||
eventAddReq.setDeviceName(dataSetParam.getDeviceName());
|
||||
eventAddReq.setDataSet(dataSet);
|
||||
eventAddReqList.add(eventAddReq);
|
||||
}
|
||||
//发送,后续可以改为异步 或者 消息队列实现
|
||||
|
||||
flowFeignClient.addEventsList(eventAddReqList);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("添加告警事件数据失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,179 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.entity.ModelDeviceColComp;
|
||||
import com.ho.business.service.ModelDeviceService;
|
||||
import com.ho.business.vo.DeviceTransfer;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.common.tools.util.UpdateObjectUtil;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.api.vo.req.inverter.InverterAddReq;
|
||||
import com.ho.datacollect.feignclient.DataCollectTdFeignClient;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import com.ho.datacollect.service.ModelConvertService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Description 光伏逆变器(
|
||||
* 这个处理类是所有逆变器的统一入口
|
||||
* Author yule
|
||||
* Date 2023/1/5 18:54
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.INVERTER)
|
||||
@Slf4j
|
||||
public class InverterDataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
DataCollectTdFeignClient tdFeignClient;
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
UpdateObjectUtil updateObjectUtil;
|
||||
|
||||
@Autowired
|
||||
ModelConvertService modelConvertService;
|
||||
|
||||
@Autowired
|
||||
ModelDeviceService modelDeviceService;
|
||||
|
||||
//异步调用方法,并指定 数据采集模块的线程池
|
||||
//@Async("dataCollectThreadPoolExecutor")
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
String deviceType = null;
|
||||
try {
|
||||
//
|
||||
log.info("InverterDataCollectServiceImpl: dataSetParam:{}", dataSetParam);
|
||||
//拼接表名
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
Integer srcId = dataSetParam.getSrcId();
|
||||
//设备类型
|
||||
deviceType = dataSetParam.getDeviceType();
|
||||
//时间送过来是个字符串,要转成Date类型
|
||||
String freezeTime = dataSetParam.getFreezeTime();
|
||||
//捕获异常,如果时间转换出错,就使用现在的时间
|
||||
DateTime ts = new DateTime();
|
||||
Date createTime = new Date();
|
||||
Date updateTime = Convert.toDate(freezeTime);
|
||||
try {
|
||||
ts = DateUtil.parse(freezeTime, CommonConstant.DATE_YMD_HMSS);
|
||||
} catch (Exception e) {
|
||||
log.error("freezeTime 格式转换时出错");
|
||||
}
|
||||
String tableName = new StringBuilder().append(groupId).append(DataCollectConstant.TABLE_LINK)
|
||||
.append(stationId).append(DataCollectConstant.TABLE_LINK).append(srcId).toString();
|
||||
log.info("产生的tableName: {}", tableName);
|
||||
//创建新增对象
|
||||
InverterAddReq inverterAddReq = new InverterAddReq();
|
||||
//共通属性
|
||||
inverterAddReq.setGroupId(groupId);
|
||||
inverterAddReq.setStationId(stationId);
|
||||
inverterAddReq.setSrcId(srcId);
|
||||
inverterAddReq.setTableName(tableName);
|
||||
inverterAddReq.setDeviceType(deviceType);
|
||||
JSONObject dataSet = dataSetParam.getDataSet();
|
||||
log.info("deviceType: {}", deviceType);
|
||||
log.info("dataSet: {} ", dataSet);
|
||||
|
||||
//从设备定义表中得到需要哪些数据,不是所有数据都要入库
|
||||
//需要入库的字段是根据外部设备模型查出来的字段集合
|
||||
//dataSet赋值6个
|
||||
if (dataSet == null) {
|
||||
dataSet = new JSONObject();
|
||||
}
|
||||
//缓存中的值更新状态
|
||||
JSONObject modifyDataObject = new JSONObject();
|
||||
dataSet.forEach((k, v) -> {
|
||||
DeviceTransfer deviceTransfer = new DeviceTransfer();
|
||||
deviceTransfer.setValue(new BigDecimal(String.valueOf(v)));
|
||||
deviceTransfer.setUpdateTime(updateTime);
|
||||
//通讯故障保持原有状态
|
||||
if(DataCollectConstant.COMMUNICATION_STATUS.equals(k)){
|
||||
modifyDataObject.put(k,v);
|
||||
}else{
|
||||
modifyDataObject.put(k,deviceTransfer);
|
||||
}
|
||||
});
|
||||
|
||||
List<ModelDeviceColComp> compList = modelDeviceService.getCompListByType(CommonConstant.ModelDeviceType.DEVICE, deviceType, null);
|
||||
// if (compList.isEmpty()) {
|
||||
// log.error("该设备未配置映射字段");
|
||||
// return;
|
||||
// }
|
||||
//入库的数据是设备原始格式,但需要经过一轮匹配,匹配到才入库不是每个字段都入库
|
||||
JSONObject inputData = modelConvertService.getInputDataByDeviceType(dataSet, compList, deviceType);
|
||||
// List<DeviceTypeCol> deviceTypeCols = modelDeviceService.selectByDeviceTypeAndIsShow(deviceType, CommonConstant.ONE);
|
||||
// boolean isMatch = inputData.isEmpty() && (deviceTypeCols != null && deviceTypeCols.size() == 0);
|
||||
// if (isMatch) {
|
||||
// log.error("转换后无匹配的数据字段 ,无需入库");
|
||||
// return;
|
||||
// }
|
||||
//时序数据对象, 把两个时间字段放进去
|
||||
inputData.put(DataCollectConstant.TS, ts);
|
||||
inputData.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//将设备数据格式转换为平台数据格式(注意这个是平台数据格式)
|
||||
JSONObject modelObject = modifyDataObject;
|
||||
JSONObject convertInputData = modelConvertService.convertModelFromDevice(modifyDataObject, compList, deviceType);
|
||||
log.info("映射的字段 convertInputData:" + convertInputData);
|
||||
if(!convertInputData.isEmpty()){
|
||||
modelObject.putAll(convertInputData);
|
||||
}
|
||||
addCommunicationStatus(dataSet, modelObject);
|
||||
DeviceTransfer deviceTransfer = (DeviceTransfer) modelObject.get(DataCollectConstant.CUMULATIVE_POWER_GENERATION);
|
||||
Integer communicationStatus = (Integer) modelObject.get(DataCollectConstant.COMMUNICATION_STATUS);
|
||||
if (deviceTransfer != null && CommonConstant.ONE.equals(communicationStatus) && BigDecimal.ZERO.compareTo(deviceTransfer.getValue()) == 0) {
|
||||
log.error("通信状态正常且总发电量数据为0,需要丢弃;{},{}", communicationStatus, deviceTransfer.getValue());
|
||||
return;
|
||||
}
|
||||
modelObject.put(DataCollectConstant.TS, ts);
|
||||
//最后更新时间
|
||||
modelObject.put(DataCollectConstant.UPDATE_TIME, createTime);
|
||||
//先判断缓存中是否存在
|
||||
String redisKey = deviceType + ":" + stationId + ":" + srcId;
|
||||
Boolean flag = redisService.hasKey(redisKey);
|
||||
//缓存的数据是转换后的平台模型
|
||||
if (!flag) {
|
||||
//没数据新建一个createTime
|
||||
modelObject.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//设备类型塞进去
|
||||
modelObject.put(DataCollectConstant.DEVICE_TYPE, deviceType);
|
||||
}
|
||||
log.info("modelObject: {}", modelObject);
|
||||
redisService.hmset(redisKey, modelObject);
|
||||
//入库时候调用不同逆变器入库方法,因为数据格式不同
|
||||
inputData.putAll(dataSet);
|
||||
log.info("inputData:{},", inputData);
|
||||
inverterAddReq.setInverter(inputData);
|
||||
tdFeignClient.insertInverterOne(inverterAddReq);
|
||||
} catch (Exception e) {
|
||||
log.error("添加逆变器:{} 失败: {}", deviceType, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 往缓存中增加通讯故障状态
|
||||
*
|
||||
* @param dataSet
|
||||
* @param modelObject
|
||||
*/
|
||||
private void addCommunicationStatus(JSONObject dataSet, JSONObject modelObject) {
|
||||
Integer communicationStatus = (Integer) modelObject.get(DataCollectConstant.COMMUNICATION_STATUS);
|
||||
if (null == communicationStatus) {
|
||||
modelObject.put(DataCollectConstant.COMMUNICATION_STATUS, dataSet.get(DataCollectConstant.COMMUNICATION_STATUS) == null ? CommonConstant.ONE : dataSet.get(DataCollectConstant.COMMUNICATION_STATUS));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,107 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.entity.ModelDeviceColComp;
|
||||
import com.ho.business.vo.DeviceTransfer;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.service.ModelConvertService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: 模型转换服务类
|
||||
* @date 2023/2/11
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ModelConvertServiceImpl implements ModelConvertService {
|
||||
|
||||
|
||||
//根据模型映射得到需要入库的数据对象
|
||||
@Override
|
||||
public JSONObject getInputDataByDeviceType(JSONObject dataSet, List<ModelDeviceColComp> compList, String deviceType) {
|
||||
//需要新增的对象数据
|
||||
JSONObject inputData = new JSONObject();
|
||||
//设备配置字段,只需要字段名
|
||||
Map<String, String> deviceColMap = new HashMap<>();
|
||||
for (ModelDeviceColComp comp : compList) {
|
||||
deviceColMap.put(comp.getDeviceCol(), comp.getDeviceCol());
|
||||
}
|
||||
//只保留col字段
|
||||
//把配置的数据转为Map效率更高
|
||||
|
||||
//过滤需要入库的字段
|
||||
dataSet.forEach((k, v) -> {
|
||||
//已经配置的字段并且值非空
|
||||
if (deviceColMap.containsKey(k) && v != null) {
|
||||
//使用映射后的模型字段
|
||||
String modelCol = deviceColMap.get(k);
|
||||
inputData.put(modelCol, v);
|
||||
}
|
||||
});
|
||||
//这个过滤的目的是只接受有用的字段
|
||||
log.info("入库前数据为: {} ,", inputData);
|
||||
return inputData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JSONObject getInputDataByDeviceTypeForDevice001(Date updateTime, JSONObject dataSet, List<ModelDeviceColComp> compList, String deviceType) {
|
||||
JSONObject modelObject = new JSONObject();
|
||||
//根据映射表中的数据字段进行一一转换, 为提高检索效率,先把配置数据映射为 device_col为key的map
|
||||
if (dataSet == null || compList == null || compList.isEmpty()) {
|
||||
return modelObject;
|
||||
}
|
||||
Map<String, ModelDeviceColComp> convertMap = compList.stream().collect(Collectors.toMap(ModelDeviceColComp::getDeviceCol, ModelDeviceColComp -> ModelDeviceColComp));
|
||||
//过滤需要入库的字段
|
||||
dataSet.forEach((k, v) -> {
|
||||
//已经配置的字段并且值非空
|
||||
if (convertMap.containsKey(k) && v != null) {
|
||||
//todo 更改一体机柜缓存值类型
|
||||
ModelDeviceColComp comp = convertMap.get(k);
|
||||
String modelCol = comp.getModelCol();
|
||||
modelObject.put(modelCol, v);
|
||||
|
||||
// HashMap<String,Object> map = (HashMap<String,Object>)v;
|
||||
// //拿到映射实体
|
||||
// ModelDeviceColComp comp = convertMap.get(k);
|
||||
// String modelCol = comp.getModelCol();
|
||||
// HashMap<String,Object> dataMap = new HashMap<>();
|
||||
// dataMap.put(modelCol,map.get(k));
|
||||
// dataMap.put(DataCollectConstant.UPDATE_TIME, updateTime);
|
||||
// modelObject.put(modelCol, dataMap);
|
||||
}
|
||||
});
|
||||
//这个过滤的目的是只接受有用的字段
|
||||
// log.info("入库前数据为: {} ,", modelObject);
|
||||
return modelObject;
|
||||
}
|
||||
|
||||
//设备模型映射为平台模型实体
|
||||
@Override
|
||||
public JSONObject convertModelFromDevice(JSONObject dataSet, List<ModelDeviceColComp> compList, String deviceType) {
|
||||
JSONObject modelObject = new JSONObject();
|
||||
//根据映射表中的数据字段进行一一转换, 为提高检索效率,先把配置数据映射为 device_col为key的map
|
||||
if (dataSet == null || compList == null || compList.isEmpty()) {
|
||||
return modelObject;
|
||||
}
|
||||
Map<String, ModelDeviceColComp> convertMap = compList.stream().collect(Collectors.toMap(ModelDeviceColComp::getDeviceCol, ModelDeviceColComp -> ModelDeviceColComp));
|
||||
//遍历dataSet并进行转换
|
||||
dataSet.forEach((k, v) -> {
|
||||
//map中包含传入的实体中字段,就给映射
|
||||
if (convertMap.containsKey(k)) {
|
||||
//拿到映射实体
|
||||
ModelDeviceColComp comp = convertMap.get(k);
|
||||
String modelCol = comp.getModelCol();
|
||||
modelObject.put(modelCol, v);
|
||||
}
|
||||
});
|
||||
return modelObject;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,107 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import com.ho.business.entity.Topology;
|
||||
import com.ho.business.service.TopologyService;
|
||||
import com.ho.business.vo.req.TopologyReqVo;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.feignclient.DataCollectTdFeignClient;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author
|
||||
* @desc: 拓扑数据
|
||||
* 这个地方需要把拓扑数据和设备数据都入库
|
||||
* @date
|
||||
*/
|
||||
|
||||
@Service(DataCollectConstant.KEY_MAP.TOPOLOGY)
|
||||
@Slf4j
|
||||
public class TopologyDataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
TopologyService topologyService;
|
||||
|
||||
@Autowired
|
||||
DataCollectTdFeignClient tdFeignClient;
|
||||
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
log.info("TopologyDataCollectServiceImpl=================");
|
||||
try {
|
||||
if (dataSetParam == null) {
|
||||
return;
|
||||
}
|
||||
//取topology
|
||||
List<Topology> topologyList = dataSetParam.getList();
|
||||
if (topologyList == null || topologyList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
for (Topology topology : topologyList) {
|
||||
Topology topo = new Topology();
|
||||
//id设置为空,自增
|
||||
topo.setId(null);
|
||||
topo.setPid(topology.getPid());
|
||||
topo.setSrcId(topology.getId());
|
||||
topo.setGroupId(groupId);
|
||||
topo.setStationId(stationId);
|
||||
topo.setName(topology.getName());
|
||||
topo.setCategory(topology.getCategory());
|
||||
topo.setType(topology.getType());
|
||||
topo.setSubType(topology.getSubType());
|
||||
//遍历link连接点赋值
|
||||
/*
|
||||
JSONArray link = topologyMqtt.getLink();
|
||||
List<JSONObject> linkList = link.toJavaList(JSONObject.class);
|
||||
for (JSONObject lidJsonObesct : linkList) {
|
||||
if (topo.getLink1() == null) {
|
||||
Integer lid = lidJsonObesct.getInteger("lid");
|
||||
topo.setLink1(lid);
|
||||
continue;
|
||||
}
|
||||
if (topo.getLink2() == null) {
|
||||
Integer lid = lidJsonObesct.getInteger("lid");
|
||||
topo.setLink1(lid);
|
||||
continue;
|
||||
}
|
||||
if (topo.getLink3() == null) {
|
||||
Integer lid = lidJsonObesct.getInteger("lid");
|
||||
topo.setLink1(lid);
|
||||
continue;
|
||||
}
|
||||
if (topo.getLink4() == null) {
|
||||
Integer lid = lidJsonObesct.getInteger("lid");
|
||||
topo.setLink1(lid);
|
||||
continue;
|
||||
}
|
||||
}*/
|
||||
//入库前判断是否已存在
|
||||
TopologyReqVo topologyReqVo = new TopologyReqVo();
|
||||
topologyReqVo.setGroupId(groupId);
|
||||
topologyReqVo.setStationId(stationId);
|
||||
topologyReqVo.setSrcId(topology.getId());
|
||||
Topology topogy = topologyService.selectBysev(topologyReqVo);
|
||||
if (topogy != null) {
|
||||
topo.setId(topogy.getId());
|
||||
topologyService.updateByPrimaryKey(topo);
|
||||
}
|
||||
// 先模拟报唯一主键错误
|
||||
else {
|
||||
topologyService.insertSelective(topo);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
//} catch (BeansException e) {
|
||||
} catch (Exception e) {
|
||||
log.error("添加或更新topology数据失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,156 @@
|
||||
package com.ho.datacollect.service.impl;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.business.entity.ModelDeviceColComp;
|
||||
import com.ho.business.service.ModelDeviceService;
|
||||
import com.ho.business.service.TopologyService;
|
||||
import com.ho.business.vo.DeviceTransfer;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.common.tools.util.UpdateObjectUtil;
|
||||
import com.ho.datacollect.api.constant.DataCollectConstant;
|
||||
import com.ho.datacollect.api.vo.req.add.DataSetParam;
|
||||
import com.ho.datacollect.api.vo.req.add.TdUniversalMeterReq;
|
||||
import com.ho.datacollect.feignclient.DataCollectTdFeignClient;
|
||||
import com.ho.datacollect.service.DataCollectService;
|
||||
import com.ho.datacollect.service.ModelConvertService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Description 通用电表
|
||||
* Author yule
|
||||
* Date 2022/10/11 10:09
|
||||
*/
|
||||
@Service(DataCollectConstant.KEY_MAP.UNIVERSAL_METER)
|
||||
@Slf4j
|
||||
public class UniversalMeterDataCollectServiceImpl implements DataCollectService {
|
||||
|
||||
@Autowired
|
||||
TopologyService topologyService;
|
||||
|
||||
@Autowired
|
||||
DataCollectTdFeignClient tdFeignClient;
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
UpdateObjectUtil updateObjectUtil;
|
||||
|
||||
@Autowired
|
||||
ModelConvertService modelConvertService;
|
||||
|
||||
@Autowired
|
||||
ModelDeviceService modelDeviceService;
|
||||
|
||||
@Override
|
||||
public void doService(DataSetParam dataSetParam) {
|
||||
String deviceType = null;
|
||||
try {
|
||||
//
|
||||
log.info("UniversalMeterDataCollectServiceImpl: dataSetParam:{}" ,dataSetParam);
|
||||
//拼接表名
|
||||
Integer groupId = dataSetParam.getGroupId();
|
||||
Integer stationId = dataSetParam.getStationId();
|
||||
Integer srcId = dataSetParam.getSrcId();
|
||||
//设备类型
|
||||
deviceType =dataSetParam.getDeviceType();
|
||||
//时间送过来是个字符串,要转成Date类型
|
||||
String freezeTime = dataSetParam.getFreezeTime();
|
||||
//捕获异常,如果时间转换出错,就使用现在的时间
|
||||
DateTime ts = new DateTime();
|
||||
Date createTime = new Date();
|
||||
Date updateTime = Convert.toDate(freezeTime);
|
||||
try {
|
||||
ts = DateUtil.parse(freezeTime, CommonConstant.DATE_YMD_HMSS);
|
||||
} catch (Exception e) {
|
||||
log.error("freezeTime 格式转换时出错");
|
||||
}
|
||||
String tableName = new StringBuilder().append(groupId).append(DataCollectConstant.TABLE_LINK)
|
||||
.append(stationId).append(DataCollectConstant.TABLE_LINK).append(srcId).toString();
|
||||
log.info("产生的tableName: {}" ,tableName);
|
||||
//创建新增对象
|
||||
TdUniversalMeterReq tdUniversalMeterReq = new TdUniversalMeterReq();
|
||||
//共通属性
|
||||
tdUniversalMeterReq.setGroupId(groupId);
|
||||
tdUniversalMeterReq.setStationId(stationId);
|
||||
tdUniversalMeterReq.setSrcId(srcId);
|
||||
tdUniversalMeterReq.setTableName(tableName);
|
||||
tdUniversalMeterReq.setDeviceType(deviceType);
|
||||
JSONObject dataSet = dataSetParam.getDataSet();
|
||||
log.info("deviceType: {}" ,deviceType);
|
||||
log.info("dataSet: {} " ,dataSet);
|
||||
|
||||
//从设备定义表中得到需要哪些数据,不是所有数据都要入库
|
||||
//需要入库的字段是根据外部设备模型查出来的字段集合
|
||||
//dataSet赋值6个
|
||||
if(dataSet==null){
|
||||
dataSet= new JSONObject();
|
||||
}
|
||||
JSONObject modifyDataObject = new JSONObject();
|
||||
dataSet.forEach((k, v) -> {
|
||||
DeviceTransfer deviceTransfer = new DeviceTransfer();
|
||||
deviceTransfer.setValue(new BigDecimal(String.valueOf(v)));
|
||||
deviceTransfer.setUpdateTime(updateTime);
|
||||
//通讯故障保持原有状态
|
||||
if(DataCollectConstant.COMMUNICATION_STATUS.equals(k)){
|
||||
modifyDataObject.put(k,v);
|
||||
}else{
|
||||
modifyDataObject.put(k,deviceTransfer);
|
||||
}
|
||||
});
|
||||
List<ModelDeviceColComp> compList = modelDeviceService.getCompListByType(CommonConstant.ModelDeviceType.DEVICE, deviceType,null);
|
||||
// if (compList.isEmpty()) {
|
||||
// log.error("该设备未配置映射字段");
|
||||
// return;
|
||||
// }
|
||||
//入库的数据是设备原始格式,但需要经过一轮匹配,匹配到才入库不是每个字段都入库
|
||||
JSONObject inputData = modelConvertService.getInputDataByDeviceType(dataSet, compList, deviceType);
|
||||
// if (inputData.isEmpty()) {
|
||||
// log.error("转换后无匹配的数据字段 ,无需入库");
|
||||
// return;
|
||||
// }
|
||||
//时序数据对象, 把两个时间字段放进去
|
||||
inputData.put(DataCollectConstant.TS, ts);
|
||||
inputData.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//将设备数据格式转换为平台数据格式(注意这个是平台数据格式)
|
||||
JSONObject modelObject = modifyDataObject;
|
||||
modelObject.put(DataCollectConstant.TS, ts);
|
||||
//最后更新时间
|
||||
modelObject.put(DataCollectConstant.UPDATE_TIME,createTime);
|
||||
JSONObject convertInputData = modelConvertService.convertModelFromDevice(modifyDataObject, compList, deviceType);
|
||||
log.info("映射的字段 convertInputData:" + convertInputData);
|
||||
if(!convertInputData.isEmpty()){
|
||||
modelObject.putAll(convertInputData);
|
||||
}
|
||||
//先判断缓存中是否存在
|
||||
String redisKey = deviceType + ":" + stationId + ":" + srcId;
|
||||
Boolean flag = redisService.hasKey(redisKey);
|
||||
//缓存的数据是转换后的平台模型
|
||||
if (!flag) {
|
||||
//没数据新建一个createTime
|
||||
modelObject.put(DataCollectConstant.CREATE_TIME, createTime);
|
||||
//设备类型塞进去
|
||||
modelObject.put(DataCollectConstant.DEVICE_TYPE, deviceType);
|
||||
}
|
||||
log.info("modelObject: {}", modelObject);
|
||||
redisService.hmset(redisKey, modelObject);
|
||||
//入库时候调用不同逆变器入库方法,因为数据格式不同
|
||||
inputData.putAll(dataSet);
|
||||
log.info("inputData:{},", inputData);
|
||||
tdUniversalMeterReq.setUniversalMeter(inputData);
|
||||
tdFeignClient.insertUniversalMeterOne(tdUniversalMeterReq);
|
||||
} catch (Exception e) {
|
||||
log.error("添加通用电表:{} 失败: {}" ,deviceType, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,75 @@
|
||||
package com.ho.datacollect.util;
|
||||
|
||||
public class AnotherMqttConfigUtil {
|
||||
|
||||
public static String[] commonTopic = new String[]{
|
||||
"+/device/27d83a2844ff5866",
|
||||
"+/device/77ba753718908d1a",
|
||||
"1/device/+"
|
||||
};
|
||||
|
||||
/**
|
||||
* 获取登录验证的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getLoginRequestTopic(){
|
||||
String log = "/login/request";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取文件请求的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getReadRequestTopic(){
|
||||
String log = "/read/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 写文件响应的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getWriteRequestTopic(){
|
||||
String log = "/write/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数据上送监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getReportPushTopic(){
|
||||
String log = "/report/push";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取远程控制主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getControlResponseTopic(){
|
||||
String log = "/control/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,184 @@
|
||||
package com.ho.datacollect.util;
|
||||
|
||||
public class MqttConfigUtil {
|
||||
|
||||
public static String[] commonTopic = new String[]{
|
||||
"+/device/fa22fd97b39c04c8",
|
||||
"+/device/803274d9432df350",
|
||||
"+/device/e4067161c4ab1929",
|
||||
"+/device/f0e9a01d8b98e820",
|
||||
"+/device/f6c6c5b5dfcc73bb",
|
||||
"+/device/9038222e7fb8789d",
|
||||
"+/device/a707000000000000",
|
||||
"+/device/811b4eb0f8e99c12",
|
||||
"+/device/c2c574a5c691bf69",
|
||||
"+/device/5265899ad223c157",
|
||||
"+/device/517664ba87ac49ec",
|
||||
"+/device/b602b10956119d39",
|
||||
"+/device/4c7dd125b6da91fd",
|
||||
"+/device/581bf6724737da0c",
|
||||
"+/device/8a2396ad453891b1",
|
||||
"+/device/917ca24a9ccdf809",
|
||||
"+/device/a8402702a1d41d88",
|
||||
"+/device/7a5202c7dc74afd6",
|
||||
"+/device/bfe7a19ced50c54d",
|
||||
"+/device/21f835330b485415",
|
||||
"+/device/a978d559eeb0a32e",
|
||||
"+/device/4dddf8b0caae7d8b",
|
||||
"+/device/beff9c2ea2d210c4",
|
||||
"+/device/a5af67550fd4dc50",
|
||||
"+/device/0c3e8eadd58f8a51",
|
||||
"+/device/14ed724c77b73494",
|
||||
"+/device/d12d361c6bfef025",
|
||||
"+/device/781c2bf41a6ffa5a",
|
||||
"+/device/ac10829b20169da0",
|
||||
"+/device/ec939740502f3a66",
|
||||
"+/device/e0c812e00ac4e006",
|
||||
"+/device/0000000000000000",
|
||||
"+/device/5e9b285116453b1a",
|
||||
"+/device/eeb34d0de2b6a953",
|
||||
"+/device/56501a13712f9a6e",
|
||||
"+/device/e8d98e91aaa7b04f",
|
||||
"+/device/6c6978a7fb9a8f6d",
|
||||
"+/device/6803ef0410fb7c02",
|
||||
"+/device/fe828101e353b919",
|
||||
"+/device/42ff4118453e41ca",
|
||||
"+/device/d9f866f14483c92d",
|
||||
"+/device/1deae4a5f1400666",
|
||||
"+/device/a608ea4ffb5221d7",
|
||||
"+/device/bd0ccfe06c1d3596",
|
||||
"+/device/262cdfc279065aa0",
|
||||
"+/device/3df87897c35ae0c5",
|
||||
"+/device/8ca0297304c266b6",
|
||||
"+/device/1200000000000000",
|
||||
"+/device/333470f1dccfb19b",
|
||||
"+/device/02bd28350b1c2619",
|
||||
"+/device/9688431d4f01806e",
|
||||
"+/device/a2f3973e0c3b4835",
|
||||
"+/device/f91779c511763a5b",
|
||||
"+/device/2b8924e509ee8559",
|
||||
"+/device/9e4ee680a26303f7",
|
||||
"+/device/56d9bb7131c724b5",
|
||||
"+/device/9511bebccc477456",
|
||||
"+/device/0a3146610bcc5440",
|
||||
"+/device/e817466c6127ab44",
|
||||
"+/device/99efbb96bc40fd15",
|
||||
"+/device/43687f5b1ebecb64",
|
||||
"+/device/b9e0cdfdd4df7736",
|
||||
"+/device/ed76d5ab39f6c42d",
|
||||
"+/device/63f8d5df0c9d647c",
|
||||
"+/device/98b61a1f166b8419",
|
||||
"+/device/201913c33e0318ee",
|
||||
"+/device/9c74bebb12d05635",
|
||||
"+/device/1f237999be964f42",
|
||||
"+/device/104cebca14ffa98c",
|
||||
"+/device/2d1cab6edbfaeb94",
|
||||
"+/device/c8c30d1decff167d",
|
||||
"+/device/984423fdfda19376",
|
||||
"+/device/18db1248acf66a3e",
|
||||
"+/device/e18710921a53bfff",
|
||||
"+/device/04878a29f87e237d",
|
||||
"+/device/c336512bd77d6cbc",
|
||||
"+/device/01ac2a161029e555",
|
||||
"+/device/4eeccb413407a4e4",
|
||||
"+/device/14e7f5c9b123360d",
|
||||
"+/device/daf25437cbc31a00",
|
||||
"+/device/44dfc09110e1f994",
|
||||
"+/device/2dd522056c132349",
|
||||
"+/device/bd7f78f9d070abe6",
|
||||
"+/device/e932c594ce083810",
|
||||
"+/device/b80c3b7513773c7f",
|
||||
"+/device/7f0d37c30d28ed0f",
|
||||
"+/device/53be81deba3b9c74",
|
||||
"+/device/fe5f56cc9029bf20",
|
||||
"+/device/60723affac3821a5",
|
||||
"+/device/d3e22616db04dc52",
|
||||
"+/device/36c12afabd8fa201",
|
||||
"+/device/71db70b8ce2eb0d1",
|
||||
"+/device/69f319418cfe13a2",
|
||||
"+/device/f043173d1fd8cb0d",
|
||||
"+/device/9b5084678310c4da",
|
||||
"+/device/fa3936fdeb8f8cd1",
|
||||
"+/device/0819c35644808b72",
|
||||
"+/device/40442c5a0a29de37",
|
||||
"+/device/75fa0ee5048bd500",
|
||||
"+/device/39674be356de68ad",
|
||||
"+/device/b490b672a5f76716",
|
||||
"+/device/6a3ba96ed146872b",
|
||||
"+/device/3a785a63862c213d",
|
||||
"+/device/2d29a0fbac938329",
|
||||
"+/device/7d97391a68f8d6af",
|
||||
"+/device/b50a1edd44549876",
|
||||
"+/device/7c31d3c5c077228f",
|
||||
"+/device/99e513be6075f8c6",
|
||||
"+/device/5d4297256f02ebc2",
|
||||
"+/device/67aa37e699e1e08f",
|
||||
"+/device/ea0ebfbfa1487bd2",
|
||||
"+/device/aa8a43d326dddb3f"
|
||||
};
|
||||
|
||||
/**
|
||||
* 获取登录验证的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getLoginRequestTopic(){
|
||||
String log = "/login/request";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取文件请求的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getReadRequestTopic(){
|
||||
String log = "/read/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 写文件响应的监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getWriteRequestTopic(){
|
||||
String log = "/write/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数据上送监听主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getReportPushTopic(){
|
||||
String log = "/report/push";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取远程控制主题
|
||||
* @return
|
||||
*/
|
||||
public static String[] getControlResponseTopic(){
|
||||
String log = "/control/response";
|
||||
String[] str = new String[commonTopic.length];
|
||||
for (int i = 0; i < commonTopic.length; i++) {
|
||||
str[i] = commonTopic[i]+log;
|
||||
}
|
||||
return str;
|
||||
}
|
||||
}
|
||||
100
data-collect-service/src/main/resources/application-dev.yml
Normal file
100
data-collect-service/src/main/resources/application-dev.yml
Normal file
@ -0,0 +1,100 @@
|
||||
server:
|
||||
port: 8010
|
||||
servlet:
|
||||
context-path: /api
|
||||
|
||||
#swagger2配置
|
||||
swagger2:
|
||||
enable: true #开启swagger
|
||||
projectName: 云平台--business-前端接口
|
||||
controllerPath: com.ho.datacollect.controller
|
||||
|
||||
#mqtt
|
||||
mqtt:
|
||||
url: tcp://123.60.190.77:1883 # 线上环境MQTT
|
||||
userName: admin
|
||||
passWord: public
|
||||
timeout: 5000
|
||||
keepAlive: 60
|
||||
|
||||
#密钥
|
||||
secret:
|
||||
loginSecret: 6131231@#42a765#
|
||||
|
||||
#topic
|
||||
topic:
|
||||
edgeLoginRequest: +/device/+/login/request
|
||||
cloudLoginResponse: +/cloud/+/login/response
|
||||
|
||||
cloudReadRequest: +/cloud/+/read/request
|
||||
edgeReadResponse: +/device/+/read/response
|
||||
cloudWriteRequest: +/cloud/+/write/request
|
||||
edgeWriteResponse: +/device/+/write/response
|
||||
|
||||
cloudReportPush: +/cloud/+/report/push
|
||||
edgeReportPush: +/device/+/report/push
|
||||
|
||||
cloudControlRequest: +/cloud/+/control/request
|
||||
edgeControlResponse: +/device/+/control/response
|
||||
|
||||
|
||||
|
||||
spring:
|
||||
jackson:
|
||||
time-zone: GMT+8
|
||||
application:
|
||||
name: datacollect-service
|
||||
cloud:
|
||||
nacos:
|
||||
discovery:
|
||||
server-addr: 127.0.0.1:8848
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
#url: jdbc:mysql://1.95.153.121:3306/business_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
|
||||
url: jdbc:mysql://192.168.100.244:3306/business_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
|
||||
#username: zzkj
|
||||
username: root
|
||||
#password: zzkj@688737
|
||||
password: 123456
|
||||
druid:
|
||||
initialSize: 5
|
||||
minIdle: 5
|
||||
maxActive: 5
|
||||
keepAlive: true #保持长连接
|
||||
connection-error-retry-attempts: 3
|
||||
#Redis
|
||||
redis:
|
||||
port: 6379 #端口
|
||||
timeout: 3000ms #连接超时
|
||||
#host: 1.95.153.121 #单机
|
||||
host: 192.168.100.242 #单机
|
||||
#password: zzkj@688737
|
||||
password: 123456
|
||||
database: 0
|
||||
|
||||
#port: 6379 #端口
|
||||
#timeout: 5000ms #连接超时
|
||||
#host: 127.0.0.1 #单机
|
||||
#password:
|
||||
#database: 0
|
||||
#集群 真实环境开启
|
||||
# cluster:
|
||||
# nodes:
|
||||
# - 127.0.0.1:6379
|
||||
# - 127.0.0.1:6380
|
||||
# - 127.0.0.1:6381
|
||||
lettuce:
|
||||
pool:
|
||||
max-active: 8 #连接池最大连接 默认8
|
||||
max-idle: 8 #连接池中最大空闲连接 默认8
|
||||
min-idle: 1 #连接池中最小空闲连接 默认0
|
||||
max-wait: 2000ms #连接池最大阻塞等待时间 使用负值表示没有限制
|
||||
|
||||
minio:
|
||||
accessKey: admin
|
||||
secretKey: zzkj@688737
|
||||
endpoint: http://192.168.0.236:9000
|
||||
prefixUrl: http://192.168.0.236:9000
|
||||
|
||||
#跳过登录:true ,不跳过:false
|
||||
skipLogin: false
|
||||
3
data-collect-service/src/main/resources/application.yml
Normal file
3
data-collect-service/src/main/resources/application.yml
Normal file
@ -0,0 +1,3 @@
|
||||
spring:
|
||||
profiles:
|
||||
active: dev
|
||||
73
data-collect-service/src/main/resources/bootstrap.yml
Normal file
73
data-collect-service/src/main/resources/bootstrap.yml
Normal file
@ -0,0 +1,73 @@
|
||||
spring:
|
||||
#caffeine缓存
|
||||
cache:
|
||||
type: caffeine
|
||||
caffeine:
|
||||
spec: initialCapacity=50,maximumSize=500,expireAfterWrite=300s
|
||||
cache-names: stationBySn,device,modelConfigList,topologyList
|
||||
|
||||
mybatis:
|
||||
type-aliases-package: com.ho.datacollect.entity
|
||||
mapper-locations: classpath:mapper/*.xml
|
||||
configuration:
|
||||
#驼峰
|
||||
mapUnderscoreToCamelCase: true
|
||||
|
||||
#Logging
|
||||
logging:
|
||||
config: classpath:logback.xml
|
||||
level:
|
||||
com.ho.datacollect.mapper: info
|
||||
|
||||
|
||||
#水电气转换系数 todo 转换系数
|
||||
energy:
|
||||
convertFactor:
|
||||
elec: 2
|
||||
water: 2
|
||||
gas: 2
|
||||
#二氧化碳到煤的转换系数
|
||||
co2ToCole: 0.4
|
||||
#二氧化碳到树的转化系数
|
||||
co2ToTree: 1800
|
||||
#二氧化碳到汽油的转化系数
|
||||
co2ToGasoline: 2.30
|
||||
#树到平方公里的转化系数
|
||||
treeToSquareKilometer: 40000
|
||||
#对比电站下的采集点时间差值
|
||||
station:
|
||||
timeDifference: 10
|
||||
|
||||
#逆变器配置
|
||||
inverter:
|
||||
#查询逆变器发电量的时间间隔 分钟
|
||||
timeInterval: 15
|
||||
co2Factor: 0.997
|
||||
five: 5
|
||||
|
||||
# 大屏配置
|
||||
largeScreen:
|
||||
#收益系数
|
||||
income: 0.6
|
||||
|
||||
#mqtt接外部数据使用
|
||||
mqtt1:
|
||||
url: tcp://1.95.131.171:1883
|
||||
userName: root
|
||||
passWord: zzkj@688737
|
||||
timeout: 5000
|
||||
keepAlive: 60
|
||||
|
||||
|
||||
#默认逻辑开关控制 使用默认逻辑为true,不使用为false
|
||||
switch:
|
||||
#心跳
|
||||
controlResponseConsumerDefaultLogic: true
|
||||
#登录
|
||||
loginRequestConsumerDefaultLogic: true
|
||||
#响应边端
|
||||
readResponseConsumerDefaultLogic: true
|
||||
#上送
|
||||
reportPushConsumerDefaultLogic: true
|
||||
#写返回
|
||||
writeResponseConsumerDefaultLogic: true
|
||||
Binary file not shown.
70
data-collect-service/src/main/resources/logback.xml
Normal file
70
data-collect-service/src/main/resources/logback.xml
Normal file
@ -0,0 +1,70 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE xml>
|
||||
<configuration>
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%date{yyyy-MM-dd HH:mm:ss.SSS,CTT} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>/home/hocloud/logs/datacollect-service/datacollect-service.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>/home/hocloud/logs/datacollect-service/datacollect-service.%d{yyyy-MM-dd}-%i.log
|
||||
</fileNamePattern>
|
||||
<maxHistory>10</maxHistory>
|
||||
<!-- 除按日志记录之外,还配置了日志文件不能超过2M,若超过2M,日志文件会以索引0开始,
|
||||
命名日志文件,例如log-error-2013-12-21.0.log -->
|
||||
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||
<maxFileSize>256MB</maxFileSize>
|
||||
</timeBasedFileNamingAndTriggeringPolicy>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>%date{yyyy-MM-dd HH:mm:ss.SSS,CTT} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!--Error级别-->
|
||||
<appender name="errorFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>/home/hocloud/logs/datacollect-service/datacollect-service-error.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>/home/hocloud/logs/datacollect-service/datacollect-service-error.%d{yyyy-MM-dd}-%i.log
|
||||
</fileNamePattern>
|
||||
<!--180天-->
|
||||
<maxHistory>10</maxHistory>
|
||||
<!-- 除按日志记录之外,还配置了日志文件不能超过2M,若超过2M,日志文件会以索引0开始,
|
||||
命名日志文件,例如log-error-2013-12-21.0.log -->
|
||||
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||
<maxFileSize>256MB</maxFileSize>
|
||||
</timeBasedFileNamingAndTriggeringPolicy>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>%date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
<!-- 此filter过滤debug级别以下的日志-->
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>ERROR</level>
|
||||
</filter>
|
||||
<!-- 级别过滤器,根据日志级别进行过滤。-->
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<level>ERROR</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- project default level -->
|
||||
<logger name="com.ho.business" level="info"/>
|
||||
|
||||
<!--log4jdbc -->
|
||||
<logger name="jdbc.sqltiming" level="DEBUG"/>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="console"/>
|
||||
<appender-ref ref="rollingFile"/>
|
||||
<appender-ref ref="errorFile"/>
|
||||
</root>
|
||||
</configuration>
|
||||
187
data-collect-service/src/test/java/CloudSendMsg.java
Normal file
187
data-collect-service/src/test/java/CloudSendMsg.java
Normal file
@ -0,0 +1,187 @@
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.common.tools.util.UpdateObjectUtil;
|
||||
import com.ho.datacollect.api.vo.req.Meter;
|
||||
import com.ho.datacollect.api.vo.req.mqtt.DataJson;
|
||||
import com.ho.datacollect.api.vo.req.mqtt.MqttJson;
|
||||
import com.ho.common.tools.util.MqttUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.junit.Test;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @Description 云端模拟发送消息测试
|
||||
* Author yule
|
||||
* Date 2022/12/7 11:28
|
||||
*/
|
||||
@ContextConfiguration
|
||||
@Slf4j
|
||||
public class CloudSendMsg {
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testReadRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" + clientId);
|
||||
try {
|
||||
MqttClient client = new MqttClient("tcp://192.168.1.198:1883", clientId);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName("admin");
|
||||
options.setPassword("public".toCharArray());
|
||||
options.setCleanSession(true);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("mqtt建立连接:{}", complete);
|
||||
|
||||
MqttJson json = new MqttJson();
|
||||
json.setToken(new MqttUtil().getToken());
|
||||
json.setDataType(CommonConstant.MQTT_DATATYPE.READ_REQUEST);
|
||||
json.setIsZip(CommonConstant.MQTT_ISZIP.NOT_IS);
|
||||
DataJson data = new DataJson();
|
||||
data.setCmdId("abcd1234");
|
||||
List<String> list = new ArrayList<>();
|
||||
list.add("station_251/para/");
|
||||
list.add("station_251/bay_252/device_254/para/");
|
||||
list.add("station_251/bay_252/device_254/device_257/para/");
|
||||
data.setList(list);
|
||||
String dataString = JSONObject.toJSONString(data);
|
||||
json.setData(dataString);
|
||||
String jsonString = JSONObject.toJSONString(json);
|
||||
MqttMessage mqttMessageResponse = new MqttMessage();
|
||||
mqttMessageResponse.setPayload(jsonString.getBytes());
|
||||
mqttMessageResponse.setQos(0);
|
||||
mqttMessageResponse.setRetained(false);
|
||||
client.publish("251/cloud/APO1231564/read/request", mqttMessageResponse);
|
||||
log.info("topic:{},msg:{}", "251/cloud/APO1231564/read/request", jsonString);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteRequest() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" + clientId);
|
||||
try {
|
||||
MqttClient client = new MqttClient("tcp://192.168.1.198:1883", clientId);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName("admin");
|
||||
options.setPassword("public".toCharArray());
|
||||
options.setCleanSession(true);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("mqtt建立连接:{}", complete);
|
||||
|
||||
MqttJson json = new MqttJson();
|
||||
json.setToken(new MqttUtil().getToken());
|
||||
json.setDataType(CommonConstant.MQTT_DATATYPE.WRITE_REQUEST);
|
||||
json.setIsZip(CommonConstant.MQTT_ISZIP.NOT_IS);
|
||||
JSONObject data = new JSONObject();
|
||||
data.put("cmdId", "adcd1234");
|
||||
Map<String, Object> station = new HashMap<>();
|
||||
station.put("id", 1);
|
||||
Map<String, Object> para = new HashMap<>();
|
||||
para.put("name", "上海奉贤电厂New");
|
||||
station.put("para", para);
|
||||
data.put("station", station);
|
||||
String dataString = JSONObject.toJSONString(data);
|
||||
json.setData(dataString);
|
||||
String jsonString = JSONObject.toJSONString(json);
|
||||
MqttMessage mqttMessageResponse = new MqttMessage();
|
||||
mqttMessageResponse.setPayload(jsonString.getBytes());
|
||||
mqttMessageResponse.setQos(2);
|
||||
mqttMessageResponse.setRetained(false);
|
||||
client.publish("1/cloud/APO1231564/write/request", mqttMessageResponse);
|
||||
log.info("topic:{},msg:{}", "1/cloud/APO1231564/write/request", jsonString);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisconnect() {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("clientId:" + clientId);
|
||||
try {
|
||||
MqttClient client = new MqttClient("tcp://192.168.1.198:1883", clientId);
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName("admin");
|
||||
options.setPassword("public".toCharArray());
|
||||
options.setCleanSession(true);
|
||||
IMqttToken iMqttToken = client.connectWithResult(options);
|
||||
boolean complete = iMqttToken.isComplete();
|
||||
log.info("mqtt建立连接:{}", complete);
|
||||
|
||||
MqttJson json = new MqttJson();
|
||||
json.setToken(new MqttUtil().getToken());
|
||||
json.setDataType(CommonConstant.MQTT_DATATYPE.DISCONNECT);
|
||||
json.setIsZip(CommonConstant.MQTT_ISZIP.NOT_IS);
|
||||
Map<String, Date> map = new HashMap<>();
|
||||
map.put("sendTime", new Date());
|
||||
String data = JSONObject.toJSONString(map);
|
||||
json.setData(data);
|
||||
String jsonString = JSONObject.toJSONString(json);
|
||||
String topic = "251/cloud/APO1231564/control/request";
|
||||
MqttMessage mqttMessageResponse = new MqttMessage();
|
||||
mqttMessageResponse.setPayload(jsonString.getBytes());
|
||||
mqttMessageResponse.setQos(2);
|
||||
mqttMessageResponse.setRetained(false);
|
||||
log.info("topic:{},msg:{}", topic, jsonString);
|
||||
client.publish(topic, mqttMessageResponse);
|
||||
RedisService redisService = new RedisService();
|
||||
//redisService.set(RedisKeyConstant.);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getString() {
|
||||
DateTime parse = DateUtil.parse("2022-12-12/11:24:30", CommonConstant.DATE);
|
||||
System.out.println(parse);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void object() throws IllegalAccessException {
|
||||
Meter meter1 = new Meter();
|
||||
meter1.setBackwardActE0(new BigDecimal(0));
|
||||
meter1.setBackwardActE1(new BigDecimal(1));
|
||||
meter1.setBackwardActE2(new BigDecimal(2));
|
||||
System.out.println(meter1);
|
||||
Meter meter2 = new Meter();
|
||||
meter1.setBackwardActE0(new BigDecimal(5));
|
||||
meter1.setBackwardActE1(new BigDecimal(6));
|
||||
meter1.setBackwardActE2(new BigDecimal(7));
|
||||
meter2.setBackwardActE3(new BigDecimal(3));
|
||||
meter2.setBackwardActE4(new BigDecimal(4));
|
||||
|
||||
UpdateObjectUtil util = new UpdateObjectUtil();
|
||||
Meter meter = (Meter) util.updateObjectAttribute(meter1, meter2);
|
||||
System.out.println(meter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void json(){
|
||||
String date = "2023-01-08 13:12:12.333";
|
||||
String substring = date.substring(0, 7);
|
||||
String substring1 = date.substring(0, 4);
|
||||
System.out.println(substring);
|
||||
System.out.println(substring1);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
40
data-collect-service/src/test/java/JsonTest.java
Normal file
40
data-collect-service/src/test/java/JsonTest.java
Normal file
@ -0,0 +1,40 @@
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.common.tools.constant.CommonConstant;
|
||||
import com.ho.datacollect.api.vo.req.mqtt.DataJson;
|
||||
import com.ho.datacollect.api.vo.req.mqtt.MqttJson;
|
||||
import com.ho.common.tools.util.MqttUtil;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc:
|
||||
* @date 2022/12/10
|
||||
*/
|
||||
public class JsonTest {
|
||||
public static void main(String[] args) {
|
||||
MqttJson json = new MqttJson();
|
||||
json.setToken(new MqttUtil().getToken());
|
||||
json.setDataType(CommonConstant.MQTT_DATATYPE.READ_REQUEST);
|
||||
json.setIsZip(CommonConstant.MQTT_ISZIP.NOT_IS);
|
||||
DataJson data = new DataJson();
|
||||
data.setCmdId("abcd1234");
|
||||
List<String> list = new ArrayList<>();
|
||||
list.add("station_251/para/");
|
||||
list.add("station_251/bay_252/device_254/para/");
|
||||
list.add("station_251/bay_252/device_254/device_257/para/");
|
||||
data.setList(list);
|
||||
String dataString = JSONObject.toJSONString(data);
|
||||
json.setData(dataString);
|
||||
String jsonString = JSONObject.toJSONString(json);
|
||||
|
||||
HashMap<String, Object> hashMap = null;
|
||||
|
||||
HashMap<String, Object> hashMap1 = JSONObject.parseObject(jsonString.getBytes(StandardCharsets.UTF_8), Map.class);
|
||||
System.out.println();
|
||||
}
|
||||
}
|
||||
45
data-collect-service/src/test/java/MacTest.java
Normal file
45
data-collect-service/src/test/java/MacTest.java
Normal file
@ -0,0 +1,45 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc:
|
||||
* @date 2022/11/5
|
||||
*/
|
||||
public class MacTest {
|
||||
private static final String ALGORITHM = "HmacSHA256";
|
||||
public static void main(String[] args) throws Exception{
|
||||
List<String> list = new ArrayList<>();
|
||||
for (int i = 01; i <= 31; i++) {
|
||||
String s = String.valueOf(i);
|
||||
if (s.length() == 1){
|
||||
s = 0 + s;
|
||||
}
|
||||
list.add(s);
|
||||
}
|
||||
System.out.println(list);
|
||||
String date = "2020-01-15 15:45:42.444";
|
||||
String substring = date.substring(0, 4);
|
||||
System.out.println(substring);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将byte转为16进制
|
||||
*
|
||||
* @param bytes
|
||||
* @return
|
||||
*/
|
||||
private static String byte2Hex(byte[] bytes) {
|
||||
StringBuffer stringBuffer = new StringBuffer();
|
||||
String temp = null;
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
temp = Integer.toHexString(bytes[i] & 0xFF);
|
||||
if (temp.length() == 1) {
|
||||
//1得到一位的进行补0操作
|
||||
stringBuffer.append("0");
|
||||
}
|
||||
stringBuffer.append(temp);
|
||||
}
|
||||
return stringBuffer.toString();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package com.ho.datacollect;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.ho.common.tools.service.RedisService;
|
||||
import com.ho.datacollect.config.SendMsgMqttClient;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author fancl
|
||||
* @desc: DateCollect模块自测
|
||||
* @date 2023/2/11
|
||||
*/
|
||||
@SpringBootTest
|
||||
@RunWith(SpringRunner.class)
|
||||
public class DataCollectTest {
|
||||
|
||||
@Autowired
|
||||
RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
SendMsgMqttClient mqttClient;
|
||||
|
||||
@Test
|
||||
public void t(){
|
||||
Map<String,String> map = new HashMap<>();
|
||||
map.put("token", "v1:4fb062aff1c9270209db");
|
||||
String jsonString = JSONObject.toJSONString(map);
|
||||
String topic = "1/cloud/APO1231564/read/request";
|
||||
mqttClient.publish(topic,jsonString);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user