C# ile RabbitMQ'yi bir ileti aracısı çerçevesi olarak kullanıyorum. Benim senaryoda, farklı kuyruklardan tüketilebilecek çok sayıda tüketicim var. birkaç kuyruk, bir tüketici perspektifinden mantıksal bir grup oluşturur.RabbitMQ, FIFO'yu korurken bir kanaldan 1 mesaj almak mümkün mü?
Örneğin: Kuyruk1, Kuyruk2, Kuyruk3.
Consumer1 [Queue1, Queue3] ilk mesaj alıp isteyeceksinizmevcut değil deneyin Queue2
den almaya yalnızca Consumer2 ve [Queue2, Queue3] ilk mesajı almak isteyecektirdurumunda kullanılabilir değil deneyin Queue1
'dan alınması gerekenler Aşağıda her biri, kuyruk alt kümesi hakkında Tüketici bakımı mümkündür. Tüketici2, tüm sıralarda, öncelikler 1,2 ile ilgilenir, dolayısıyla tüketici önceliklerin mantığını korur.
public class MessagesProvider
{
private IConnection _connection;
private IModel _channel;
private readonly IConnectionFactory _connectionFactory;
private ConcurrentDictionary<string, string> _tagsQueueItems;
private EventingBasicConsumer _consumer;
private CancellationTokenSource _cts;
private TaskCompletionSource<BasicGetResult> _tcs;
public MessagesProvider()
{
_connectionFactory = new ConnectionFactory()
{
HostName = "localhost"
};
}
public Task<BasicGetResult> GetMessage(int timeout, IEnumerable<string> queues)
{
_tagsQueueItems = new ConcurrentDictionary<string, string>();
_cts = new CancellationTokenSource();
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.SingleMessagePerChannel();
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += OnReceive;
foreach (var queue in queues)
{
var tag = _channel.BasicConsume(queue, false, _consumer);
_tagsQueueItems.AddOrUpdate(queue, tag, (k, o) => queue);
}
return _tcs.Task;
}
private void SetResult(BasicGetResult result)
{
_tcs.SetResult(result);
if (_channel.IsOpen)
_channel.Close();
if (_connection.IsOpen)
_connection.Close();
}
private void OnReceive(object sender, BasicDeliverEventArgs e)
{
_consumer.Received -= OnReceive;
_channel.BasicAck(e.DeliveryTag, false);
var result = new BasicGetResult(e.DeliveryTag, e.Redelivered, e.Exchange, e.RoutingKey, 1,
e.BasicProperties, e.Body);
SetResult(result);
}
}
Kanal tanımını:
public static class ChannelExtension
{
public static void SingleMessagePerChannel(this IModel channel)
{
channel.BasicQos(0,1,true);
}
public static void SingleMessagePerConsumer(this IModel channel)
{
channel.BasicQos(0,1,false);
}
}
bu burada İşte
RabbitMQ tutorials sunulmaktadır çalışma sıraları farklı bir senaryo olduğunu Not bir ileti dönmek gerekiyordu bir sınıftır
Sorun şu ki, bir sıraya ("GetMessage" yönteminde) bir sıra için kayıt yapıyorum ve bir sonraki FIFO messa'yı getiren bir "atomik" işlemi bulamadım. Bir grup kuyruktan ge.
Bunu yapmanın bir yolunu arıyorum: _consumer.getNextMessage();
başka bir yaklaşım, öncelik ilk grubundan tüm ilk en iletileri tüketmek en eski mesajı bulmak için tüketici tarafında bunları filtre ve (aynı öncelik grubunda) diğer mesajlar için hiçbir ack göndermek olacaktır . Bu yaklaşım, mesajların getirildiği bir noktada, diğer tüketicilerin bunları işlemeyeceği anlamına gelmediği için problemlidir (ack'e kadar).
Herhangi bir öneriniz var mı? Teşekkürler.
Ayarın yanlış ayarlanmış gibi görünüyor. Elde etmeye çalıştığınız şeyi detaylandırır mısınız lütfen? – cantSleepNow
+1 to @CantSleepNow - bu senaryo kötü bir tasarıma benziyor ve yapmak istediğiniz şey büyük problemler olmaksızın gerçekten mümkün değil. Ne problemi çözmeye çalışıyorsun? –
@ cantSleepNow, @ Derick Bailey, Tasarımın kötü olduğunu ve beğenmediğimi kabul ediyorum. Tüketici düzeyinde önceliği belirlemek yanlıştır IMO, şu anda çalışan bir sistemin istemcileri tarafından gereklidir. elde etmek, üçüncü taraf tüketicilerin hangi mesajların onlar için önemli olduğunu tanımlamasına izin vermektir. Her tüketici kendi öncelikleri vardır. Bu, aynı denge içindeki tüketiciler arasındaki bir çalışma yük dengesinden daha karmaşık bir senaryo. Bu, bunun için RabbitMQ kullanan kutudan bir çözüm var. Diyagramı daha yakından inceleyebilir, ihtiyaçlarımızı açıklar. – user440850