过去几个月,俄乌战场的态势发生了一个根本性变化。 不是某个村庄的易手,不是某条战线的推进,而是一个整体趋势的逆转:俄罗斯自2024年1月以来,首次净失...
2026-05-27 3
↓推荐关注↓
前言
开发环境
VS 2022 + .NET 6 + WebAPI / 控制台
1、新建一个WebAPI项目,用来后面做测试使用
2、新建一个继承自IHostedService的服务,用于随着webapi程序的启动而自动执行。(最终代码在文末)
3、引入 MQTTNet 包,该项目提供了.net环境下的MQTT通信协议支持,这款框架很优秀,此处直接引用它来进行使用。
展开全文
4、在上面的MqttHostService类里面,开始方法里面新增初始化MQTT服务端的一些功能,例如 IP、端口号、事件等等。
5、mqtt服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。
6、添加客户端连接事件、连接关闭事件
7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。
8、事件触发时候,打印输出
9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。
10、对MqttHostService类进行注册,用于程序启动时候跟随启动。
11、上面貌似设计的不是很友好,所以把mqtt服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。
12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。
13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时OK的了.
14、新增一个控制台程序 MqttClient,用于模拟客户端。
15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。
16、在program类里面,调用客户端启动方法,用于测试使用。
17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。
19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。
20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。
22、接下来测试服务端消息推送,在MqttService服务里面,新增一个方法,用来执行mqtt服务端发布主题消息使用。有关配置信息和消息格式,如图所示。
23、新增一个API控制器,用来测试使用。API参数直接拿来进行消息的推送使用。
25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。
26、客户端program类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。
27、再次启动服务端和客户端
28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。
30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端id也进行变更为 testclient02
32、启动服务端,以及拷贝出来的客户端1,和上面修改了部分代码的客户端2,保证都已经连接上服务端。
33、调用服务端的api接口,由于服务端发布的消息是发布给topic_01的,所以只有客户端1可以接收到消息。
34、客户端1执行回车,用于发布一段消息给主题 topic_02,可以看到客户端01发布的消息,同时被服务端和客户端02接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。
36、最终的服务端代码:
MqttHostService:
publicclassMqttHostService: IHostedService, IDisposable
publicvoidDispose
conststringServerClientId = "SERVER";
publicTask StartAsync( CancellationToken cancellationToken)
MqttServerOptionsBuilder optionsBuilder = newMqttServerOptionsBuilder;
optionsBuilder.WithDefaultEndpoint;
optionsBuilder.WithDefaultEndpointPort( 10086); // 设置 服务端 端口号
optionsBuilder.WithConnectionBacklog( 1000); // 最大连接数
MqttServerOptions options = optionsBuilder.Build;
MqttService._mqttServer = newMqttFactory.CreateMqttServer(options);
MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关
MqttService._mqttServer.StartAsync;
returnTask.CompletedTask;
///<summary>
///客户端订阅主题事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
Console.WriteLine( $"ClientSubscribedTopicAsync:客户端ID=【 {arg.ClientId}】订阅的主题=【 {arg.TopicFilter}】 " );
returnTask.CompletedTask;
///<summary>
///关闭后事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_StoppedAsync(EventArgs arg)
Console.WriteLine( $"StoppedAsync:MQTT服务已关闭……");
returnTask.CompletedTask;
///<summary>
///用户名和密码验证有关
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
arg.ReasonCode = MqttConnectReasonCode.Success;
if((arg.Username ?? string.Empty)!= "admin"|| (arg.Password??String.Empty)!= "123456")
arg.ReasonCode = MqttConnectReasonCode.Banned;
Console.WriteLine( $"ValidatingConnectionAsync:客户端ID=【 {arg.ClientId}】用户名或密码验证错误 " );
returnTask.CompletedTask;
///<summary>
///消息接收事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
if( string.Equals(arg.ClientId, ServerClientId))
returnTask.CompletedTask;
Console.WriteLine( $"InterceptingPublishAsync:客户端ID=【 {arg.ClientId}】 Topic主题=【 {arg.ApplicationMessage.Topic}】 消息=【 {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【 {arg.ApplicationMessage.QualityOfServiceLevel}】" );
returnTask.CompletedTask;
///<summary>
///启动后事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_StartedAsync(EventArgs arg)
Console.WriteLine( $"StartedAsync:MQTT服务已启动……");
returnTask.CompletedTask;
///<summary>
///客户端取消订阅事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
Console.WriteLine( $"ClientUnsubscribedTopicAsync:客户端ID=【 {arg.ClientId}】已取消订阅的主题=【 {arg.TopicFilter}】 " );
returnTask.CompletedTask;
privateTask _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
Console.WriteLine( $"ApplicationMessageNotConsumedAsync:发送端ID=【 {arg.SenderId}】 Topic主题=【 {arg.ApplicationMessage.Topic}】 消息=【 {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【 {arg.ApplicationMessage.QualityOfServiceLevel}】" );
returnTask.CompletedTask;
///<summary>
///客户端断开时候触发
///</summary>
///<param name="arg"></param>
///<returns></returns>
///<exception cref="NotImplementedException"></exception>
privateTask _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
Console.WriteLine( $"ClientDisconnectedAsync:客户端ID=【 {arg.ClientId}】已断开, 地址=【 {arg.Endpoint}】 " );
returnTask.CompletedTask;
///<summary>
///客户端连接时候触发
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
Console.WriteLine( $"ClientConnectedAsync:客户端ID=【 {arg.ClientId}】已连接, 用户名=【 {arg.UserName}】地址=【 {arg.Endpoint}】 " );
returnTask.CompletedTask;
publicTask StopAsync( CancellationToken cancellationToken)
returnTask.CompletedTask;
Service:
publicclassMqttService
publicstaticMqttServer _mqttServer { get; set; }
publicstaticvoidPublishData( stringdata )
varmessage = newMqttApplicationMessage
Topic = "topic_01",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true// 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
_mqttServer.InjectApplicationMessage( newInjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
SenderClientId = "Server_01"
}).GetAwaiter.GetResult;
37、最终的客户端代码:
publicclassMqttClientService
publicstaticIMqttClient _mqttClient;
publicvoidMqttClientStart
varoptionsBuilder = newMqttClientOptionsBuilder
.WithTcpServer( "127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号
.WithCredentials( "admin", "123456") // 要访问的mqtt服务端的用户名和密码
.WithClientId( "testclient02") // 设置客户端id
.WithCleanSession
.WithTls( newMqttClientOptionsBuilderTlsParameters
UseTls = false// 是否使用 tls加密
varclientOptions = optionsBuilder.Build;
_mqttClient = newMqttFactory.CreateMqttClient;
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
_mqttClient.ConnectAsync(clientOptions);
///<summary>
///客户端连接关闭事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
Console.WriteLine( $"客户端已断开与服务端的连接……");
returnTask.CompletedTask;
///<summary>
///客户端连接成功事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
Console.WriteLine( $"客户端已连接服务端……");
// 订阅消息主题
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
// 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
// 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
_mqttClient.SubscribeAsync( "topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
returnTask.CompletedTask;
///<summary>
///收到消息事件
///</summary>
///<param name="arg"></param>
///<returns></returns>
privateTask _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
Console.WriteLine( $"ApplicationMessageReceivedAsync:客户端ID=【 {arg.ClientId}】接收到消息。Topic主题=【 {arg.ApplicationMessage.Topic}】 消息=【 {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【 {arg.ApplicationMessage.QualityOfServiceLevel}】" );
returnTask.CompletedTask;
publicvoidPublish( stringdata )
varmessage = newMqttApplicationMessage
Topic = "topic_02",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true// 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
_mqttClient.PublishAsync(message);
总结
MQTT以上演示已经完毕,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加灵活。其实,实际上MQTT的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供MQTT协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。
同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。
如果需要下发信号给硬件设备,MQTT服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。
转自:WeskyNet
转自:WeskyNet
- EOF -
点击标题可跳转
.NET 之延迟队列
.NET Core 生态大全(资源)
编译调试 .NET 6 源码
看完本文有收获?请转发分享给更多人
推荐关注「DotNet」,提升.Net技能
点赞和在看就是最大的支持❤️
相关文章
过去几个月,俄乌战场的态势发生了一个根本性变化。 不是某个村庄的易手,不是某条战线的推进,而是一个整体趋势的逆转:俄罗斯自2024年1月以来,首次净失...
2026-05-27 3
近年来,越来越多的科技公司、互联网公司CEO不再满足于在发布会上演讲,而是更生活化,直接走到台前。像黄仁勋那样逛街、喝豆汁,已经算是体面的整活方式了...
2026-05-27 6
韩国海军正式启动引进核潜艇的计划,无疑在东亚乃至全球范围内都引发了巨大的波澜。是什么原因让韩国在此刻选择加速这一军事化进程? 韩国决定推进核潜艇计划的...
2026-05-27 7
小米宝宝空调外机(来源:受访者) 信网·信号新闻5月19日讯近日,深圳市民柯先生向信号新闻(0532-80889431)反映,其在电商平台购买了一...
2026-05-20 3
【文/观察者网 山猫】 据印度尼西亚总统府官方网站报道,当地时间5月18日,印尼国民军(TNI)当天在雅加达哈利姆·珀达纳库苏马空军基地举行了“主要...
2026-05-20 3
枪手加冕,新王当立 在曼城客场挑战伯恩茅斯的比赛中,主队凭借朱尼尔·克鲁皮的进球以1-1战平曼城,这一结果直接宣告阿森纳提前锁定本赛季英格兰足球超级联...
2026-05-20 4
据大河报,5月20日凌晨,@汪涵 就电影《监狱来的妈妈》相关争议发布声明:有关我担任电影《监狱来的妈妈》出品人一事,本人特此声明。本人初衷为扶持文艺创...
2026-05-20 3
美国国会参议院19日以50票赞成、47票反对的表决结果,推进一项旨在限制总统特朗普动武权力的法案,要求特朗普今后对伊朗采取任何军事行动必须获得国会批准...
2026-05-20 4
发表评论