GraphStageWithMaterializedValue
kullanarak bir çözüm buldum. Bu konsept Play's maxLength
body parser'dan ödünç alınmıştır. Sorgumdaki ilk denemem arasındaki temel fark (derleme yapmayan), akışı mutasyona uğratmak yerine, işleme durumu hakkında bilgi aktarmak için maddi değeri kullanmam gerektiğidir. Bir Flow[ByteString, Either[Result, ByteString], NotUsed]
oluşturmuşken, ihtiyacım olan şey Flow[ByteString, ByteString, Future[Boolean]]
idi.
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
anahtar çizgisi bu biridir:
bununla Yani, benim parser
fonksiyon bu gibi bakarak biter
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
dinlenme naziksiniz edebiliyoruz kez yere düşüyor
Bu akışı yarat. Ne yazık ki, BodyValidator
oldukça verbose ve biraz kazan-platey hissediyor. Her durumda, okunması oldukça kolaydır. GraphStageWithMaterializedValue
, bu grafiğin giriş türünü ve çıkış türünü belirtmek için def shape: S
( burada) uygulamanızı bekler. Ayrıca, grafiğin gerçekte ne yapması gerektiğini tanımlamak için def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)
(M
bir Future[Boolean]
buradadır) imlenmesini bekler. İşte BodyValidator
tam kodu (aşağıda daha ayrıntılı açıklayacağımız) var:
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic, status.future)
}
}
Öncelikle grafik için girişler ve çıkışlar kurmak için bir Inlet
ve Outlet
oluşturmak istediğiniz
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
Sonra bunları shape
tanımlamak için kullanırsınız.createLogicAndMaterializedValue
İçinde
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
Eğer materialze niyetinde değerini başlatmak gerekir. Burada, akıştan tam verilere sahip olduğumda çözülebilecek bir söz kullandım. Yinelemeler arasındaki verileri izlemek için ByteStringBuilder
da oluşturuyorum.
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
Sonra bir
GraphStageLogic
aslında bu grafik işleme her noktada ne yaptığını kurmak oluşturun. İki işleyici ayarlanıyor. Biri, yukarı akış kaynağından gelen verilerle uğraşmak için bir
InHandler
'dur. Diğeri, aşağı yönde gönderilecek veri ile ilgilenmek için
OutHandler
'dur.
OutHandler
'da gerçekten ilginç bir şey yok, dolayısıyla bir
IllegalStateException
'dan kaçınmak için gerekli kazan plakası olduğunu söylemek için burada görmezden geleceğim. Üç yöntem
InHandler
:
onPush
,
onUpstreamFinish
ve
onUpstreamFailure
'da geçersiz kılınmıştır. Yeni veriler hazırda olduğunda
onPush
çağrılır. Bu yöntemde, bir sonraki veri yığınını alıyorum,
bodyBuffer
'a yazıp veriyi aşağıya doğru aktaracağım.
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
onUpstreamFinish
çağrıldığında üst yüzeyler (sürpriz). Bu, bedeni başlık ile karşılaştırmanın iş mantığının gerçekleştiği yerdir. bir şeyler ters gittiğinde de başarısız olarak, ben hayata geleceği işaretlemek böylece
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
onUpstreamFailure
uygulanmaktadır.
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
Sonra ben sadece bir demet olarak oluşturduğum
GraphStageLogic
ve
status.future
dönün.