2016-12-13 73 views
8

Akka Streams ve Akka HTTP için oldukça yeni biriyim.Akka HTTP, bir çıkış akışı aracılığıyla içerik üretmek için nasıl kullanılır?

Klasörün içeriğinden bir zip dosyası oluşturabilen ve istemciye gönderebilen basit bir HTTP sunucusu oluşturmak istiyorum.

org.zeroturnaround.zip.ZipUtil bir zip dosyası oluşturma görevini çok kolaylaştırır, ancak bir outputStream gerekir.

İşte (Scala dilinde yazılmış) benim çözümdür:

  val os = new ByteArrayOutputStream() 
      ZipUtil.pack(myFolder, os) 
      HttpResponse(entity = HttpEntity(
       MediaTypes.`application/zip`, 
       os.toByteArray)) 

Bu çözüm çalışır ancak belleğe tüm içeriğini tutar, bu yüzden ölçeklenebilir değildir.

val source = StreamConverters.asOutputStream() 

ama nasıl kullanılacağını bilmiyorum:

Bunu çözmek için anahtar bu kullanmak olduğunu düşünüyorum. :-(

Herhangi yardım

lütfen? Ben de aynı problem vardı

cevap

3

. Ben sırayla size Akka dönmek sonradan StreamConverters.fromInputStream(() => input) aracılığıyla Source dönüştürülür yapay InputStream yazmak zorunda o geri basınç uyumlu hale getirmek için Eğer bulursanız -http DSL complete yönergesi.

İşte

yazdıklarımı olduğunu.

import java.io.{File, IOException, InputStream} 
import java.nio.charset.StandardCharsets 
import java.time.LocalDate 
import java.time.format.DateTimeFormatter 

import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile} 

import scala.annotation.tailrec 
import scala.util.{Failure, Success, Try} 

class DownloadStatsZipReader(path: String, password: String) extends InputStream { 

    private val (archive, targetDate) = { 
    val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName())) 

    @tailrec 
    def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] = 
     Option(inputFile.getNextEntry) match { 
     case Some(entry) => 
      if (!entry.isDirectory) { 
      val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)") 
      if (parts(1) == "tab" && entry.getSize > 0) 
       Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match { 
       case Success(localDate) => 
        Some(localDate -> entry) 
       case Failure(_) => 
        findValidEntry() 
       } 
      else 
       findValidEntry() 
      } else 
      findValidEntry() 
     case None => None 
     } 

    val (date, _) = findValidEntry().getOrElse { 
     throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`") 
    } 
    inputFile -> date 
    } 

    private val buffer = new Array[Byte](1024) 
    private var offsetBuffer: Int = 0 
    private var sizeBuffer: Int = 0 

    def getTargetDate: LocalDate = targetDate 

    override def read(): Int = 
    sizeBuffer match { 
     case -1 => 
     -1 
     case 0 => 
     loadNextChunk() 
     read() 
     case _ => 
     if (offsetBuffer < sizeBuffer) { 
      val result = buffer(offsetBuffer) 
      offsetBuffer += 1 
      result 
     } else { 
      sizeBuffer = 0 
      read() 
     } 
    } 

    @throws[IOException] 
    override def close(): Unit = { 
    archive.close() 
    } 

    private def loadNextChunk(): Unit = try { 
    val bytesRead = archive.read(buffer) 
    if (bytesRead >= 0) { 
     offsetBuffer = 0 
     sizeBuffer = bytesRead 
    } else { 
     offsetBuffer = -1 
     sizeBuffer = -1 
    } 
    } catch { 
    case ex: Throwable => 
     ex.printStackTrace() 
     throw ex 
    } 
} 

benim kodunda hata lütfen bana bildirin.

9

Yalnızca hemen gerçekleşmez olmayabilir kaynak hayata olur bir kez OutputStream'e, erişmek bu

val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream() 
    .mapMaterializedValue(os => ZipUtil.pack(myFolder, os)) 
HttpResponse(entity = HttpEntity(
      MediaTypes.`application/zip`, 
      byteSource)) 

deneyin. Teoride kaynak aynı zamanda birden çok kez de gerçekleşebilirdi, bu yüzden bununla başa çıkabilmelisiniz.

+2

ben ... Bu kendin da oldukça öğretici ve eğlenceli kolay gibi görünüyor – expert

+1

daha önce bilseydim ama benim durumumda başarısız: 'java.lang.IllegalStateException: Henüz başlatılmadı: Sadece SetHandler GraphStageLogic yapıcısında ' –

+0

yazmaya :) –