2016-03-31 40 views
0

C# ile RabbitMQ'yi bir ileti aracısı çerçevesi olarak kullanıyorum. Benim senaryoda, farklı kuyruklardan tüketilebilecek çok sayıda tüketicim var. birkaç kuyruk, bir tüketici perspektifinden mantıksal bir grup oluşturur.RabbitMQ, FIFO'yu korurken bir kanaldan 1 mesaj almak mümkün mü?

Örneğin: Kuyruk1, Kuyruk2, Kuyruk3.

Consumer1 [Queue1, Queue3] ilk mesaj alıp isteyeceksiniz

mevcut değil deneyin Queue2

den almaya yalnızca Consumer2 ve [Queue2, Queue3] ilk mesajı almak isteyecektir

durumunda kullanılabilir değil deneyin Queue1

'dan alınması gerekenler Aşağıda her biri, kuyruk alt kümesi hakkında Tüketici bakımı mümkündür. Tüketici2, tüm sıralarda, öncelikler 1,2 ile ilgilenir, dolayısıyla tüketici önceliklerin mantığını korur.

public class MessagesProvider 
{ 

    private IConnection _connection; 
    private IModel _channel; 
    private readonly IConnectionFactory _connectionFactory; 
    private ConcurrentDictionary<string, string> _tagsQueueItems; 
    private EventingBasicConsumer _consumer; 
    private CancellationTokenSource _cts; 
    private TaskCompletionSource<BasicGetResult> _tcs; 

    public MessagesProvider() 
    { 
     _connectionFactory = new ConnectionFactory() 
     { 
      HostName = "localhost" 
     }; 

    } 

    public Task<BasicGetResult> GetMessage(int timeout, IEnumerable<string> queues) 
    { 
     _tagsQueueItems = new ConcurrentDictionary<string, string>(); 
     _cts = new CancellationTokenSource(); 
     _connection = _connectionFactory.CreateConnection(); 
     _channel = _connection.CreateModel(); 
     _channel.SingleMessagePerChannel(); 
     _consumer = new EventingBasicConsumer(_channel); 
     _consumer.Received += OnReceive; 

     foreach (var queue in queues) 
     { 
      var tag = _channel.BasicConsume(queue, false, _consumer); 
      _tagsQueueItems.AddOrUpdate(queue, tag, (k, o) => queue); 
     } 

     return _tcs.Task; 
    } 

    private void SetResult(BasicGetResult result) 
    { 
     _tcs.SetResult(result); 
     if (_channel.IsOpen) 
      _channel.Close(); 
     if (_connection.IsOpen) 
      _connection.Close(); 
    } 

    private void OnReceive(object sender, BasicDeliverEventArgs e) 
    { 
     _consumer.Received -= OnReceive; 
     _channel.BasicAck(e.DeliveryTag, false); 
     var result = new BasicGetResult(e.DeliveryTag, e.Redelivered, e.Exchange, e.RoutingKey, 1, 
      e.BasicProperties, e.Body); 

     SetResult(result); 

    } 
} 

Kanal tanımını:

public static class ChannelExtension 
{ 
    public static void SingleMessagePerChannel(this IModel channel) 
    { 
     channel.BasicQos(0,1,true); 
    } 

    public static void SingleMessagePerConsumer(this IModel channel) 
    { 
     channel.BasicQos(0,1,false); 
    } 
} 
bu burada İşte enter image description here

RabbitMQ tutorials sunulmaktadır çalışma sıraları farklı bir senaryo olduğunu Not bir ileti dönmek gerekiyordu bir sınıftır

Sorun şu ki, bir sıraya ("GetMessage" yönteminde) bir sıra için kayıt yapıyorum ve bir sonraki FIFO messa'yı getiren bir "atomik" işlemi bulamadım. Bir grup kuyruktan ge.

Bunu yapmanın bir yolunu arıyorum: _consumer.getNextMessage();

başka bir yaklaşım, öncelik ilk grubundan tüm ilk en iletileri tüketmek en eski mesajı bulmak için tüketici tarafında bunları filtre ve (aynı öncelik grubunda) diğer mesajlar için hiçbir ack göndermek olacaktır . Bu yaklaşım, mesajların getirildiği bir noktada, diğer tüketicilerin bunları işlemeyeceği anlamına gelmediği için problemlidir (ack'e kadar).

Herhangi bir öneriniz var mı? Teşekkürler.

+1

Ayarın yanlış ayarlanmış gibi görünüyor. Elde etmeye çalıştığınız şeyi detaylandırır mısınız lütfen? – cantSleepNow

+0

+1 to @CantSleepNow - bu senaryo kötü bir tasarıma benziyor ve yapmak istediğiniz şey büyük problemler olmaksızın gerçekten mümkün değil. Ne problemi çözmeye çalışıyorsun? –

+0

@ cantSleepNow, @ Derick Bailey, Tasarımın kötü olduğunu ve beğenmediğimi kabul ediyorum. Tüketici düzeyinde önceliği belirlemek yanlıştır IMO, şu anda çalışan bir sistemin istemcileri tarafından gereklidir. elde etmek, üçüncü taraf tüketicilerin hangi mesajların onlar için önemli olduğunu tanımlamasına izin vermektir. Her tüketici kendi öncelikleri vardır. Bu, aynı denge içindeki tüketiciler arasındaki bir çalışma yük dengesinden daha karmaşık bir senaryo. Bu, bunun için RabbitMQ kullanan kutudan bir çözüm var. Diyagramı daha yakından inceleyebilir, ihtiyaçlarımızı açıklar. – user440850

cevap

0

Kuyrukları ve Tüketicileri tasarlandıkları şekilde kullandığınızı sanmıyorum. Bir işin yapılması için bir kova olarak bir kuyruk düşünün. Her iş kolunda, işi yapan N tüketicileri olabilir.

Her tüketici, diğer kuyruklar veya diğer tüketicilerle ilgili olarak bilmemek veya ilgilenmek üzere tek bir sırayla çalışmak üzere tasarlanmalıdır.

  • Düzenli Öncelik Sırası
  • Yüksek Öncelikli Kuyruk
: sonra bir "yüksek öncelikli" kuyruk ulaşmak için çalışıyorsanız

Ben 2 farklı kuyruklar ile tek alışverişini yüksek olduğu ileri sürüldü

Ardından, varsayılan olarak, iletiler normal öncelik sırasına alınır. Yüksek öncelikli olması gereken bir mesaj geldiğinde, mesajın üstbilgisini Yüksek Öncelikli Sıranın içine itilecek şekilde mesaja ayarlayabilirsiniz.

Bu aynı yaklaşımı bugün bir doğrudan kuyruğunu kullanarak ve bir iletinin yüksek öncelikli sıraya gitmesini istediğimizde bir başlık ekleyerek kullanıyoruz.

+0

Neyse ki, yığın tutucuları RabbitMQ darbesinin problem için uygun olmadığına ikna edebildim, bu yüzden sorunu değiştirdiler, gereksinimleri basitleştirdiler, farklı tüketicilerin karmaşıklığını farklı mantıksal gruplardan çıkardılar. Artık tüm tüketiciler, tek bir sıraya (grup başına) yönlendirilecek olan aynı gruplardan tüketecekler. Teşekkürler. – user440850