Özel bir uygulama için bir Coder (StringUtf8Coder) nasıl değiştirileceğini anlamaya çalışıyorum.Geriye dönük uyumlu bir kodlayıcı ile bir dönüşüm için kodlayıcı değiştirme
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
import com.google.common.base.Charsets;
import org.xerial.snappy.Snappy;
import java.io.IOException;
public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> {
private static String decompressSnappy(byte[] input) throws IOException {
if (input == null) {
throw new CoderException("null input is not accepted");
}
if (Snappy.isValidCompressedBuffer(input)) {
return Snappy.uncompressString(input);
}
return new String(input, Charsets.UTF_8);
}
private static byte[] compressSnappy(String input) throws IOException {
return Snappy.compress(input);
}
public static CompressedByteArrayCoder of() {
return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy);
}
private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) {
super(coder, toFn, fromFn);
}
}
Ben de StringUtf8Coder (PubSubIO.Read için varsayılan) değiş tokuş için bir yol anlamaya çalışıyorum: Ben bir kodlayıcı uygulamış
çabuk sıkıştırılmış dizeleri işlemek için yeteneği ekler Veri akışı boru hattı güncellemesinin başarısız olmasına yol açmayacak şekilde.
Veri akışı hizmeti çalıştırıcısına, iki kodlayıcının "uyumlu" olduğunu nasıl anlatacağımı anlamaya çalışıyorum.