您的位置:首页 > 游戏 > 手游 > C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列

C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列

2024/9/24 19:16:10 来源:https://blog.csdn.net/qq_39427511/article/details/141440022  浏览:    关键词:C#实现数据采集系统-数据反写(2)消息内容处理和写入通信类队列

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService{public static string MainTopic = "DTSDAQ/";private Dictionary<string, ModbusTcp> _modbusTcps;public DAqService(DAqOption option){_modbusTcps = new Dictionary<string, ModbusTcp>();//...}/// <summary>/// 启动服务/// </summary>public void Start(){MqttControllor = new MqttControllor(_option.MqttConfig);foreach (var item in _deviceLinks){ModbusTcp modbusTcp = new ModbusTcp(item);modbusTcp.DoMonitor();modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;//将_modbusTcps.Add(item.UID, modbusTcp);MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);}if (_serviceConfig.IsPushScheduled){timer.Start();}}}

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

 <summary>
/// 设备控制,反写
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
private void DeviceControl(string topic, string msg)
{var message = JsonSerializer.Deserialize<DeviceMessage>(msg);//如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值if (message != null){var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象if (link != null){var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象//循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值foreach (var item in message.Data){var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象if (point != null){var parseMethod = point.Type.GetMethod("Parse",BindingFlags.Public | BindingFlags.Static,new[] { typeof(string) });point.WriteValue = parseMethod.Invoke(null,new object[] { item.Value.ToString() }); //通过点位id找到对应的点位对象}modbusTcp.Write(point);}}}
}

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp{/// <summary>/// 写入队列/// </summary>private Queue<RegisterPoint> _writeQueue = new Queue<RegisterPoint>();·//写入值先加入一个队列public void Write(RegisterPoint point){_writeQueue.Enqueue(point);}}

完整代码

public class DAqService
{public static string MainTopic = "DTSDAQ/";private MqttControllor MqttControllor;private Dictionary<string, ModbusTcp> _modbusTcps;private DAqOption _option;private List<DeviceLink> _deviceLinks;private ServiceConfig _serviceConfig;private System.Timers.Timer timer;public DAqService(DAqOption option){_modbusTcps = new Dictionary<string, ModbusTcp>();_option = option;_deviceLinks = option.DeviceLinks;_serviceConfig = option.ServiceConfig;timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);timer.Elapsed += Timer_Elapsed;}/// <summary>/// 启动服务/// </summary>public void Start(){MqttControllor = new MqttControllor(_option.MqttConfig);foreach (var item in _deviceLinks){ModbusTcp modbusTcp = new ModbusTcp(item);modbusTcp.DoMonitor();modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;_modbusTcps.Add(item.UID, modbusTcp);MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);}if (_serviceConfig.IsPushScheduled){timer.Start();}}/// <summary>/// 设备控制,反写/// </summary>/// <param name="topic"></param>/// <param name="msg"></param>private void DeviceControl(string topic, string msg){var message = JsonSerializer.Deserialize<DeviceMessage>(msg);//如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值if (message != null){var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象if (link != null){var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象//循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值foreach (var item in message.Data){var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象if (point != null){var parseMethod = point.Type.GetMethod("Parse",BindingFlags.Public | BindingFlags.Static,new[] { typeof(string) });point.WriteValue = parseMethod.Invoke(null,new object[] { item.Value.ToString() }); //通过点位id找到对应的点位对象}modbusTcp.Write(point);}}}}private void Timer_Elapsed(object? sender, ElapsedEventArgs e){foreach (var link in _deviceLinks){try{DeviceMessage device = new DeviceMessage { DeviceId = link.UID };foreach (RegisterPoint point in link.Points){// Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");device.Data.Add(point.UID, point.Value);}var data = JsonSerializer.Serialize(device);MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送}catch (Exception ex){Console.WriteLine(ex.Message);}}}private void ModbusTcp_ValueUpdated(RegisterPoint point, object value){if (_serviceConfig.IsPushChanged){try{DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };device.Data.Add(point.UID, value);var data = JsonSerializer.Serialize(device);MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送}catch (Exception ex){Console.WriteLine(ex.Message);}}Console.WriteLine($"Point:{point.UID}-->Value:{value}");}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com