add rtdata idx
This commit is contained in:
@@ -68,6 +68,9 @@ extern XmlConfig xmlcfg2;//角型接线xml节点解析的数据-默认映射文
|
|||||||
extern std::list<CTopic*> topicList2; //角型接线发送主题链表
|
extern std::list<CTopic*> topicList2; //角型接线发送主题链表
|
||||||
extern std::map<std::string, Xmldata*> xmlinfo_list2;//保存所有型号角形接线对应的icd映射文件解析数据
|
extern std::map<std::string, Xmldata*> xmlinfo_list2;//保存所有型号角形接线对应的icd映射文件解析数据
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
extern time_t ConvertToTimestamp(const tagTime& time);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
static std::mutex g_filemenu_cache_mtx;
|
static std::mutex g_filemenu_cache_mtx;
|
||||||
std::map<std::string, std::vector<tag_dir_info>> g_filemenu_cache;
|
std::map<std::string, std::vector<tag_dir_info>> g_filemenu_cache;
|
||||||
@@ -5110,8 +5113,124 @@ bool append_qvvr_event(const std::string& terminal_id,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////实时数据封装发送
|
||||||
|
void enqueue_realtime_pq(const RealtagPqDate_float& realdata,
|
||||||
|
int nPTType,
|
||||||
|
unsigned char cid,
|
||||||
|
const std::string& mac,
|
||||||
|
const std::string& devid)
|
||||||
|
{
|
||||||
|
// 先根据 devIdxMap 的配置决定编码分支:
|
||||||
|
// 2: 基础数据 3: 谐波电压含有率 4: 谐波电流有效值 5: 间谐波电压含有率
|
||||||
|
int idx = 0;
|
||||||
|
std::string base64;
|
||||||
|
|
||||||
|
// 这里尝试用 mac 作为 key 获取 idx;如果项目里 devIdxMap 的 key 不是 mac,
|
||||||
|
// 你可以把这里改成对应的设备 id(devid)。未命中则再尝试用规范化后的 mac。
|
||||||
|
if (devidx_get(devid, idx)) {
|
||||||
|
switch (idx) {
|
||||||
|
case 2: // 基础数据(根据接线方式选择转换方法 数据全集解析)
|
||||||
|
base64 = realdata.ConvertToBase64(nPTType);
|
||||||
|
break;
|
||||||
|
case 3: // 谐波电压含有率
|
||||||
|
base64 = realdata.ConvertToBase64_RtHarmV(nPTType);
|
||||||
|
break;
|
||||||
|
case 4: // 谐波电流有效值(幅值)
|
||||||
|
base64 = realdata.ConvertToBase64_RtHarmI();
|
||||||
|
break;
|
||||||
|
case 5: // 间谐波电压含有率
|
||||||
|
base64 = realdata.ConvertToBase64_RtInHarmV();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// 未知 idx,回退到基础数据
|
||||||
|
base64 = realdata.ConvertToBase64(nPTType);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 未配置 idx,回退到基础数据
|
||||||
|
base64 = realdata.ConvertToBase64(nPTType);
|
||||||
|
}
|
||||||
|
//std::cout << base64 << std::endl;
|
||||||
|
|
||||||
|
//lnk实时数据使用接口发送20250711
|
||||||
|
time_t data_time = ConvertToTimestamp(realdata.time);
|
||||||
|
|
||||||
|
std::vector<DataArrayItem> arr;
|
||||||
|
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
|
||||||
|
data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1”
|
||||||
|
0, //数据时标,微秒钟,无效填入“-1”
|
||||||
|
0, //数据标识,1-标识数据异常
|
||||||
|
base64});
|
||||||
|
std::string js = generate_json(
|
||||||
|
normalize_mac(mac),
|
||||||
|
-1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1”
|
||||||
|
1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字
|
||||||
|
1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
|
||||||
|
0x1302, //设备数据主动上送的数据类型
|
||||||
|
cid, //逻辑子设备ID,0-逻辑设备本身,无填-1
|
||||||
|
0x04, //数据类型固定为电能质量数据
|
||||||
|
1, //数据属性:无“0”、实时“1”、统计“2”等
|
||||||
|
idx, //数据集序号(以数据集方式上送),无填-1
|
||||||
|
arr //数据数组
|
||||||
|
);
|
||||||
|
//std::cout << js << std::en
|
||||||
|
queue_data_t data;
|
||||||
|
data.monitor_no = 1; //上送的实时数据没有测点序号,统一填1
|
||||||
|
data.strTopic = TOPIC_RTDATA; //实时topic
|
||||||
|
data.strText = js;
|
||||||
|
data.mp_id = ""; //监测点id,暂时不需要
|
||||||
|
data.tag = G_RT_TAG; //实时tag
|
||||||
|
data.key = G_RT_KEY; //实时key
|
||||||
|
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||||||
|
queue_data_list.push_back(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////统计数据打包发送
|
||||||
|
|
||||||
|
// 封装:组装统计数据并入队发送
|
||||||
|
void enqueue_stat_pq(const std::string& max_base64Str,
|
||||||
|
const std::string& min_base64Str,
|
||||||
|
const std::string& avg_base64Str,
|
||||||
|
const std::string& cp95_base64Str,
|
||||||
|
time_t data_time,
|
||||||
|
const std::string& mac,
|
||||||
|
short cid)
|
||||||
|
{
|
||||||
|
std::vector<DataArrayItem> arr;
|
||||||
|
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
|
||||||
|
data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1”
|
||||||
|
0, //数据时标,微秒钟,无效填入“-1”
|
||||||
|
0, //数据标识,1-标识数据异常
|
||||||
|
max_base64Str});
|
||||||
|
arr.push_back({2, data_time, 0, 0, min_base64Str});
|
||||||
|
arr.push_back({3, data_time, 0, 0, avg_base64Str});
|
||||||
|
arr.push_back({4, data_time, 0, 0, cp95_base64Str});
|
||||||
|
|
||||||
|
std::string js = generate_json(
|
||||||
|
normalize_mac(mac),
|
||||||
|
-1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1”
|
||||||
|
1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字
|
||||||
|
1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
|
||||||
|
0x1302, //设备数据主动上送的数据类型
|
||||||
|
cid, //逻辑子设备ID,0-逻辑设备本身,无填-1(原:avg_data.name)
|
||||||
|
0x04, //数据类型固定为电能质量
|
||||||
|
2, //数据属性:无“0”、实时“1”、统计“2”等
|
||||||
|
1, //数据集序号(以数据集方式上送),无填-1
|
||||||
|
arr //数据数组
|
||||||
|
);
|
||||||
|
|
||||||
|
//std::cout << js << std::endl;
|
||||||
|
|
||||||
|
queue_data_t data;
|
||||||
|
data.monitor_no = cid; //监测点序号(原:avg_data.name)
|
||||||
|
data.strTopic = TOPIC_STAT;//统计topic
|
||||||
|
data.strText = js;
|
||||||
|
data.mp_id = ""; //监测点id,暂时不需要
|
||||||
|
data.tag = G_ROCKETMQ_TAG; //统计tag
|
||||||
|
data.key = G_ROCKETMQ_KEY; //统计key
|
||||||
|
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||||||
|
queue_data_list.push_back(data);
|
||||||
|
|
||||||
|
std::cout << "Successfully assembled tagPqData for line: "
|
||||||
|
<< cid << std::endl;
|
||||||
|
}
|
||||||
@@ -730,6 +730,22 @@ inline bool devidx_get(const std::string& id, int& out_idx) {
|
|||||||
out_idx = it->second;
|
out_idx = it->second;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void enqueue_realtime_pq(const RealtagPqDate_float& realdata,
|
||||||
|
int nPTType,
|
||||||
|
unsigned char cid,
|
||||||
|
const std::string& mac,
|
||||||
|
const std::string& devid);
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////统计数据
|
||||||
|
void enqueue_stat_pq(const std::string& max_base64Str,
|
||||||
|
const std::string& min_base64Str,
|
||||||
|
const std::string& avg_base64Str,
|
||||||
|
const std::string& cp95_base64Str,
|
||||||
|
time_t data_time,
|
||||||
|
const std::string& mac,
|
||||||
|
short cid);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
extern int g_front_seg_index;
|
extern int g_front_seg_index;
|
||||||
extern std::string FRONT_INST;
|
extern std::string FRONT_INST;
|
||||||
|
|||||||
@@ -346,7 +346,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
|
|||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
|
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
|
||||||
|
|
||||||
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& Idx){
|
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& idx){
|
||||||
json root;
|
json root;
|
||||||
try {
|
try {
|
||||||
root = json::parse(body);
|
root = json::parse(body);
|
||||||
@@ -382,7 +382,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
|
|||||||
!messageBody.contains("realData") ||
|
!messageBody.contains("realData") ||
|
||||||
!messageBody.contains("soeData") ||
|
!messageBody.contains("soeData") ||
|
||||||
!messageBody.contains("limit")||
|
!messageBody.contains("limit")||
|
||||||
!messageBody.contains("Idx"))
|
!messageBody.contains("idx"))
|
||||||
{
|
{
|
||||||
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
|
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
|
||||||
return false;
|
return false;
|
||||||
@@ -394,7 +394,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
|
|||||||
realData = messageBody["realData"].get<bool>();
|
realData = messageBody["realData"].get<bool>();
|
||||||
soeData = messageBody["soeData"].get<bool>();
|
soeData = messageBody["soeData"].get<bool>();
|
||||||
limit = messageBody["limit"].get<int>();
|
limit = messageBody["limit"].get<int>();
|
||||||
int idx = messageBody["Idx"].get<int>();
|
idx = messageBody["idx"].get<int>();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
|
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -498,58 +498,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
//lnk20250708使用接口发送
|
//lnk20250708使用接口发送
|
||||||
time_t data_time = ConvertToTimestamp(avg_data.time);
|
time_t data_time = ConvertToTimestamp(avg_data.time);
|
||||||
|
|
||||||
std::vector<DataArrayItem> arr;
|
enqueue_stat_pq(max_base64Str,
|
||||||
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
|
min_base64Str,
|
||||||
data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1”
|
avg_base64Str,
|
||||||
0, //数据时标,微秒钟,无效填入“-1”
|
cp95_base64Str,
|
||||||
0, //数据标识,1-标识数据异常
|
data_time,
|
||||||
max_base64Str});
|
mac,
|
||||||
arr.push_back({2, data_time, 0, 0, min_base64Str});
|
avg_data.name);
|
||||||
arr.push_back({3, data_time, 0, 0, avg_base64Str});
|
|
||||||
arr.push_back({4, data_time, 0, 0, cp95_base64Str});
|
|
||||||
|
|
||||||
std::string js = generate_json(
|
|
||||||
normalize_mac(mac),
|
|
||||||
-1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1”
|
|
||||||
1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字
|
|
||||||
1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
|
|
||||||
0x1302, //设备数据主动上送的数据类型
|
|
||||||
avg_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1
|
|
||||||
0x04, //数据类型固定为电能质量
|
|
||||||
2, //数据属性:无“0”、实时“1”、统计“2”等
|
|
||||||
1, //数据集序号(以数据集方式上送),无填-1
|
|
||||||
arr //数据数组
|
|
||||||
);
|
|
||||||
|
|
||||||
//std::cout << js << std::endl;
|
|
||||||
|
|
||||||
//// 创建输出流并打开文件(覆盖模式)
|
|
||||||
//std::ofstream outFile("json.txt"); // 等价于 std::ofstream outFile(filePath, std::ios::out);
|
|
||||||
|
|
||||||
//if (outFile.is_open()) { // 检查文件是否成功打开
|
|
||||||
// outFile << js; // 写入字符串
|
|
||||||
// outFile.close(); // 关闭文件
|
|
||||||
// // 成功提示(实际应用中建议使用日志)
|
|
||||||
//}
|
|
||||||
//else {
|
|
||||||
// // 错误处理:文件打开失败(如路径不存在)
|
|
||||||
//}
|
|
||||||
|
|
||||||
queue_data_t data;
|
|
||||||
data.monitor_no = avg_data.name; //监测点序号
|
|
||||||
data.strTopic = TOPIC_STAT;//统计topic
|
|
||||||
data.strText = js;
|
|
||||||
data.mp_id = ""; //监测点id,暂时不需要
|
|
||||||
data.tag = G_ROCKETMQ_TAG; //统计tag
|
|
||||||
data.key = G_ROCKETMQ_KEY; //统计key
|
|
||||||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
|
||||||
queue_data_list.push_back(data);
|
|
||||||
|
|
||||||
std::cout << "Successfully assembled tagPqData for line: "
|
|
||||||
<< avg_data.name << std::endl;
|
|
||||||
// 输出结果
|
|
||||||
//std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount()
|
|
||||||
// << " floats): " << base64Str << std::endl;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -685,41 +640,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
strScale,
|
strScale,
|
||||||
nPTType);
|
nPTType);
|
||||||
|
|
||||||
std::string base64 = realdata.ConvertToBase64(nPTType);
|
// 转换为Base64字符串并发送lnk20250924
|
||||||
//std::cout << base64 << std::endl;
|
enqueue_realtime_pq(realdata, nPTType, cid, mac, id);
|
||||||
|
|
||||||
//lnk实时数据使用接口发送20250711
|
|
||||||
time_t data_time = ConvertToTimestamp(realdata.time);
|
|
||||||
|
|
||||||
std::vector<DataArrayItem> arr;
|
|
||||||
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
|
|
||||||
data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1”
|
|
||||||
0, //数据时标,微秒钟,无效填入“-1”
|
|
||||||
0, //数据标识,1-标识数据异常
|
|
||||||
base64});
|
|
||||||
std::string js = generate_json(
|
|
||||||
normalize_mac(mac),
|
|
||||||
-1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1”
|
|
||||||
1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字
|
|
||||||
1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
|
|
||||||
0x1302, //设备数据主动上送的数据类型
|
|
||||||
cid, //逻辑子设备ID,0-逻辑设备本身,无填-1
|
|
||||||
0x04, //数据类型固定为电能质量数据
|
|
||||||
1, //数据属性:无“0”、实时“1”、统计“2”等
|
|
||||||
2, //数据集序号(以数据集方式上送),无填-1
|
|
||||||
arr //数据数组
|
|
||||||
);
|
|
||||||
//std::cout << js << std::en
|
|
||||||
queue_data_t data;
|
|
||||||
data.monitor_no = 1; //上送的实时数据没有测点序号,统一填1
|
|
||||||
data.strTopic = TOPIC_RTDATA; //实时topic
|
|
||||||
data.strText = js;
|
|
||||||
data.mp_id = ""; //监测点id,暂时不需要
|
|
||||||
data.tag = G_RT_TAG; //实时tag
|
|
||||||
data.key = G_RT_KEY; //实时key
|
|
||||||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
|
||||||
queue_data_list.push_back(data);
|
|
||||||
|
|
||||||
|
|
||||||
// 处理完成后重置状态
|
// 处理完成后重置状态
|
||||||
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
||||||
|
|||||||
Reference in New Issue
Block a user