2015-09-12 33 views
5

AWS Java SDK kullanarak AWS SQS kuyruğundan sorgulamak için Scala'da Akka Streams kullanıyorum. , Ne zamanAkka akımları sürekli nasıl gerçekleşebilir?

val system = ActorSystem("system") 
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name")) 
val flow = Flow[Message] 
    .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem } 
    .to(Sink.ignore) 

system.scheduler.schedule(0 seconds, 2 seconds) { 
    flow.runWith(sqsSource)(ActorMaterializer()(system)) 
} 

Ancak: ben de 2 saniyelik aralıklarla akışını çalıştırmak çalışılıyor Benim uygulamada

class SQSSubscriber(name: String) extends ActorPublisher[Message] { 
    implicit val materializer = ActorMaterializer() 

    val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue") 

    val client = new AmazonSQSClient() 
    client.setRegion(RegionUtils.getRegion("us-east-1")) 
    val url = client.getQueueUrl(name).getQueueUrl 

    val MaxBufferSize = 100 
    var buf = Vector.empty[Message] 

    override def receive: Receive = { 
    case "dequeue" => 
     val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList 
     messages.foreach(self ! _) 
    case message: Message if buf.size == MaxBufferSize => 
     log.error("The buffer is full") 
    case message: Message => 
     if (buf.isEmpty && totalDemand > 0) 
     onNext(message) 
     else { 
     buf :+= message 
     deliverBuf() 
     } 
    case Request(_) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 

    @tailrec final def deliverBuf(): Unit = 
    if (totalDemand > 0) { 
     if (totalDemand <= Int.MaxValue) { 
     val (use, keep) = buf.splitAt(totalDemand.toInt) 
     buf = keep 
     use foreach onNext 
     } else { 
     val (use, keep) = buf.splitAt(Int.MaxValue) 
     buf = keep 
     use foreach onNext 
     deliverBuf() 
     } 
    } 
} 

: Ben iki saniyelik aralıklarla iletileri dequeues bir ActorPublisher yarattı Uygulamamı çalıştırdığımda ve ActorMaterializer'un neden olduğu ölü mektup bildirimleri aldım.

Bir Akka Çayı'nın sürekli olarak gerçekleştirilmesi için önerilen bir yaklaşım var mı?

+0

Şimdi bunu test edemiyorum, ancak birden fazla ActorMaterializer örneğini kullanma konusunda emin değilim. ActorPublisher'da bir örnek ve tüm akış için bir tane daha kullanıyorsunuz. –

+0

Akka-Camel kullanarak bitirdim çünkü yapmam gereken her şeyi tamamlayan güzel bir SQS entegrasyonu var (https://github.com/fzakaria/Akka-Camel-SQS/). –

+0

Sürekli olarak farklı bir ActorPublisher'ı 2 saniyede bir kullanmanız için bir nedeniniz var mı?Verilen örnek kodlara göre, aynı yayıncıyı kullanmaya devam etmek çok daha kolay olurdu ... –

cevap

7

Sana yeni ActorPublisher her 2 saniyede oluşturmanız gerekir sanmıyorum. Bu gereksiz ve hafıza kaybıdır. Ayrıca, bir ActorPublisher'ın gerekli olduğunu düşünmüyorum. Koddan anlatabildiğim kadarıyla, uygulamanızın, aynı verileri sorgulayan, giderek artan sayıda Akışı olacaktır. İstemciden her bir Message, N farklı akka Akışı tarafından işlenecek ve daha da kötüsü, N zamanla büyüyecektir. Sonsuz Döngü scala en Iterator kullanarak

Sen ActorPublisher aynı davranışı elde edebilirsiniz sorgulama için

Iterator.

//setup the client 
val client = { 
    val sqsClient = new AmazonSQSClient() 
    sqsClient setRegion (RegionUtils getRegion "us-east-1") 
    sqsClient 
} 

val url = client.getQueueUrl(name).getQueueUrl 

//single query 
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { 
    client receiveMessage (new ReceiveMessageRequest(url).getMessages) 
} 

def messageListIteartor : Iterator[Iterable[Message]] = 
    Iterator continually messageListStream 

//messages one-at-a-time "on demand", no timer pushing you around 
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity 

Bu uygulama sadece önceki tüm mesajlar tüketilen edilmiş müşteri sorgular ve bu nedenle gerçekten reactive geçerli: Sürekli müşteri sorgular bir yineleyici oluşturmak mümkündür. Sabit boyutta bir arabellek izlemeye gerek yok. Çözümünüzün bir arabelleğe ihtiyacı vardır, çünkü Mesajların oluşturulması (bir zamanlayıcı aracılığıyla) Mesajların tüketiminden (println yoluyla) ayrıştırılır. Uygulamamda, & üretiminin oluşturulması, arka basınç üzerinden tightly coupled.

Akka Akış Kaynağı

Daha sonra, bir akka akışı Kaynak beslemek için bu Yineleyici jeneratör-işlevi kullanabilirsiniz:

def messageSource : Source[Message, _] = Source fromIterator messageIterator 

Akış Formation

Ve nihayet bunu kullanabilirsiniz println gerçekleştirmek için kaynak (Bir yan not olarak: sizin flow değeriniz aslında bir Sink si nce Flow + Sink = Sink). Söz adresinin flow değerini kullanarak:

messageSource runWith flow 

Bir akka Stream işlemci bütün mesajları.