AWS Java SDK kullanarak AWS SQS kuyruğundan sorgulamak için Scala'da Akka Streams kullanıyorum. , Ne zamanAkka akımları sürekli nasıl gerçekleşebilir?
val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
.map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
.to(Sink.ignore)
system.scheduler.schedule(0 seconds, 2 seconds) {
flow.runWith(sqsSource)(ActorMaterializer()(system))
}
Ancak: ben de 2 saniyelik aralıklarla akışını çalıştırmak çalışılıyor Benim uygulamada
class SQSSubscriber(name: String) extends ActorPublisher[Message] {
implicit val materializer = ActorMaterializer()
val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")
val client = new AmazonSQSClient()
client.setRegion(RegionUtils.getRegion("us-east-1"))
val url = client.getQueueUrl(name).getQueueUrl
val MaxBufferSize = 100
var buf = Vector.empty[Message]
override def receive: Receive = {
case "dequeue" =>
val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self ! _)
case message: Message if buf.size == MaxBufferSize =>
log.error("The buffer is full")
case message: Message =>
if (buf.isEmpty && totalDemand > 0)
onNext(message)
else {
buf :+= message
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
: Ben iki saniyelik aralıklarla iletileri dequeues bir ActorPublisher yarattı Uygulamamı çalıştırdığımda ve ActorMaterializer
'un neden olduğu ölü mektup bildirimleri aldım.
Bir Akka Çayı'nın sürekli olarak gerçekleştirilmesi için önerilen bir yaklaşım var mı?
Şimdi bunu test edemiyorum, ancak birden fazla ActorMaterializer örneğini kullanma konusunda emin değilim. ActorPublisher'da bir örnek ve tüm akış için bir tane daha kullanıyorsunuz. –
Akka-Camel kullanarak bitirdim çünkü yapmam gereken her şeyi tamamlayan güzel bir SQS entegrasyonu var (https://github.com/fzakaria/Akka-Camel-SQS/). –
Sürekli olarak farklı bir ActorPublisher'ı 2 saniyede bir kullanmanız için bir nedeniniz var mı?Verilen örnek kodlara göre, aynı yayıncıyı kullanmaya devam etmek çok daha kolay olurdu ... –