.NET Core中就該這樣用隊列之RabbitMQ

.NET Core中就該這樣用隊列之RabbitMQ

.NET Core中使用RabbitMQ正確方式

首先甩官網:http://www.rabbitmq.com/

然後是.NET Client鏈接:http://www.rabbitmq.com/dotnet.html

GitHub倉庫:https://github.com/rabbitmq/rabbitmq-dotnet-client

下面直接進入正文,一共是兩個主題:消費者怎麼寫?生產者怎麼寫?

消費者

在dotnet core mvc中,消費者肯定不能通過API或者其他的東西啟動,理應是跟著程序一起啟動的.

所以...

在dotnet core 2.0以上版本,我們直接用 IHostedService 接口實現.

  • .NET Core 中基於 IHostedService 實現後臺定時任務
  • Implementing background tasks in .NET Core 2.x webapps or microservices with IHostedService and the BackgroundService class

直接上代碼.

// RabbitListener.cs 這個是基類,只實現註冊RabbitMQ後到監聽消息,然後每個消費者自己去重寫RouteKey/QueueName/消息處理函數Process
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Test.Listener
{
public class RabbitListener : IHostedService
{
private readonly IConnection connection;
private readonly IModel channel;
public RabbitListener(IOptions<appconfiguration> options)
{
try
{
var factory = new ConnectionFactory()
{
// 這是我這邊的配置,自己改成自己用就好
HostName = options.Value.RabbitHost,
UserName = options.Value.RabbitUserName,
Password = options.Value.RabbitPassword,
Port = options.Value.RabbitPort,
};
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
}
catch (Exception ex)
{
Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
Register();
return Task.CompletedTask;
}
protected string RouteKey;
protected string QueueName;
// 處理消息的方法
public virtual bool Process(string message)
{
throw new NotImplementedException();
}
// 註冊消費者監聽在這裡
public void Register()
{
Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
channel.ExchangeDeclare(exchange: "message", type: "topic");
channel.QueueDeclare(queue:QueueName, exclusive: false);

channel.QueueBind(queue: QueueName,
exchange: "message",
routingKey: RouteKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var result = Process(message);
if (result)
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue: QueueName, consumer: consumer);
}
public void DeRegister()
{
this.connection.Close();
}
public Task StopAsync(CancellationToken cancellationToken)
{
this.connection.Close();
return Task.CompletedTask;
}
}
}
// 隨便貼一個子類
using System;
using System.Text;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace Test.Listener
{
public class ChapterLister : RabbitListener
{
private readonly ILogger<rabbitlistener> _logger;
// 因為Process函數是委託回調,直接將其他Service注入的話兩者不在一個scope,
// 這裡要調用其他的Service實例只能用IServiceProvider CreateScope後獲取實例對象
private readonly IServiceProvider _services;

public ChapterLister(IServiceProvider services, IOptions<appconfiguration> options,
ILogger<rabbitlistener> logger) : base(options)
{
base.RouteKey = "done.task";
base.QueueName = "lemonnovelapi.chapter";
_logger = logger;
_services = services;
}
public override bool Process(string message)
{
var taskMessage = JToken.Parse(message);
if (taskMessage == null)
{
// 返回false 的時候回直接駁回此消息,表示處理不了
return false;
}
try
{
using (var scope = _services.CreateScope())
{
var xxxService = scope.ServiceProvider.GetRequiredService<xxxxservice>();
return true;
}
}
catch (Exception ex)
{
_logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
_logger.LogError(-1, ex, "Process fail");
return false;
}
}
}
}
/<xxxxservice>/<rabbitlistener>/<appconfiguration>/<rabbitlistener>/<appconfiguration>

然後,記住....

注入到Startup.cs的時候,使用AddHostedService

services.AddHostedService<chapterlister>();
/<chapterlister>

消費者就這樣玩了.

生產者咋玩呢?

這個其實更簡單.


using System;
using System.Net;
using Newtonsoft.Json.Linq;
using RestSharp;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Newtonsoft.Json;
using System.Text;
namespace Test.SDK
{
public class RabbitMQClient
{
private readonly IModel _channel;
private readonly ILogger _logger;
public RabbitMQClient(IOptions<appconfiguration> options, ILogger<rabbitmqclient> logger)
{
try
{
var factory = new ConnectionFactory()
{
HostName = options.Value.RabbitHost,
UserName = options.Value.RabbitUserName,
Password = options.Value.RabbitPassword,
Port = options.Value.RabbitPort,
};
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
}
catch (Exception ex)
{
logger.LogError(-1, ex, "RabbitMQClient init fail");
}
_logger = logger;
}
public virtual void PushMessage(string routingKey, object message)
{
_logger.LogInformation($"PushMessage,routingKey:{routingKey}");
_channel.QueueDeclare(queue: "message",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string msgJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(msgJson);
_channel.BasicPublish(exchange: "message",
routingKey: routingKey,

basicProperties: null,
body: body);
}
}
}
/<rabbitmqclient>/<appconfiguration>

切記注入實例的時候用單例模式.

services.AddSingleton<rabbitmqclient>();/<rabbitmqclient>

看完此文,你有什麼想說的嗎?


分享到:


相關文章: