2016-03-21 19 views
0

Özel bir uygulama için bir Coder (StringUtf8Coder) nasıl değiştirileceğini anlamaya çalışıyorum.Geriye dönük uyumlu bir kodlayıcı ile bir dönüşüm için kodlayıcı değiştirme

import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; 
import com.google.cloud.dataflow.sdk.coders.Coder; 
import com.google.cloud.dataflow.sdk.coders.CoderException; 
import com.google.cloud.dataflow.sdk.coders.DelegateCoder; 
import com.google.common.base.Charsets; 
import org.xerial.snappy.Snappy; 

import java.io.IOException; 

public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> { 

    private static String decompressSnappy(byte[] input) throws IOException { 
     if (input == null) { 
      throw new CoderException("null input is not accepted"); 
     } 
     if (Snappy.isValidCompressedBuffer(input)) { 
      return Snappy.uncompressString(input); 
     } 
     return new String(input, Charsets.UTF_8); 
    } 

    private static byte[] compressSnappy(String input) throws IOException { 
     return Snappy.compress(input); 
    } 

    public static CompressedByteArrayCoder of() { 
     return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy); 
    } 

    private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) { 
     super(coder, toFn, fromFn); 
    } 
} 

Ben de StringUtf8Coder (PubSubIO.Read için varsayılan) değiş tokuş için bir yol anlamaya çalışıyorum: Ben bir kodlayıcı uygulamış

çabuk sıkıştırılmış dizeleri işlemek için yeteneği ekler Veri akışı boru hattı güncellemesinin başarısız olmasına yol açmayacak şekilde.

Veri akışı hizmeti çalıştırıcısına, iki kodlayıcının "uyumlu" olduğunu nasıl anlatacağımı anlamaya çalışıyorum.

cevap

1

Maalesef, şu anda Google Cloud Dataflow hizmetinde çalışan bir boru hattını güncellerken PCollection kodlayıcıları değiştirilemez. Bu durumda, boru hattını yeni bir Dataflow işi olarak göndermeniz gerekir.

Daha fazla bilgi için, özellikle uyumluluk denetimi bölümü ile ilgili daha fazla bilgi için Updating an Existing Pipeline adresine bakın. Ancak bu, gelecekte ele alabileceğimiz bir şeydir. Lütfen herhangi bir güncelleme için belgelerimizi kontrol edin.