2016-12-18 29 views
5

bir akış veri kaynağı oluşturmak için:Apache Flink veri akışından kullanım değerleri dinamik olarak ben aşağıdakileri yapar Apache FLINK kullanarak örnek bir uygulama oluşturmak çalışıyorum

  1. stok sembollerin bir demetini okur (örn Kafka kuyruğundan 'CSCO', 'FB').
  2. Her sembol için, geçerli fiyatların gerçek zamanlı olarak araştırılması gerçekleştirilir ve akış aşağı işlem için değerleri akar.

* Orijinal yayına *

güncelleme Ben ayrı sınıfa haritası işlevini taşındı ve çalışma zamanı hata mesajı "MapFunction uygulanması artık seri hale getirilebilir değil alamadım. Nesne muhtemelen seri hale getirilemeyen alanlar "içeriyor veya başvuruyor".

Şu anda karşı karşıya olduğum konu Kafka konusundaki "stok fiyatlarının" fiyatları almaya çalışmadığına inanıyorum. Sorun çekmeye çalışıyorum ve herhangi bir güncelleme yayınlayacağım.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

cevap

4

StreamExecutionEnvironment bir akış uygulaması operatörlerin iç kullanılmak üzere girintili değildir. Amaçlanan araç, bu test edilmez ve teşvik edilmez. Çalışabilir ve bir şeyler yapabilir, ancak büyük olasılıkla iyi davranmayacak ve muhtemelen uygulamanızı öldürecektir.

Programınızdaki StockSymbolToPriceMapFunction, gelen her kayıt için tamamen yeni ve bağımsız bir yeni akış uygulaması belirtir. Ancak, streamExecEnv.execute()'u aramadığınız için programlar başlatılmaz ve map yöntemi hiçbir şey yapmadan döner.

Eğer olur çağrı streamExecEnv.execute(), fonksiyon işçiler JVM Yeni bir yerel Flink küme başlatmak ve bu yerel Flink küme üzerinde uygulamayı başlatmak istiyorsanız. Yerel Flink örneği, yığın alanının büyük bir kısmını alacaktır ve birkaç kümenin başlatılmasından sonra, işçinin muhtemelen ne olmasını istediğinizi değil, OutOfMemoryError nedeniyle ölecektir.

+0

Gelen verilere yanıt olarak dinamik olarak akış oluşturmak mümkün mü? –

+0

Dinamik olarak gelen kayıtları temel alan verileri okuyan ve yayan bir "FlatMapFunction" uygulayabilirsiniz. Örneğin, dosya adlarına sahip bir akışınız varsa, bir 'FlatMapFunction' bu dosyaları açabilir ve verilerini yayınlayabilirsiniz. Bununla birlikte, tüm kayıtların çıktı türleri aynı olmalıdır. Ayrıca, olay zamanı işlem semantiğinin doğru olması zor olabilir, ancak bu dinamik olarak eklenen kaynakların genel bir sorunudur. –

+0

@FabianHueske Benzer bir kullanım durumunu çözüyorum. Yani eğer FlatMapFunction kullanmam gerekiyorsa, dosyayı normal dosya API'sini kullanarak scala/Java kullanarak ve Flink'in readTextFile'ını kullanmadan okumak zorundayız. Nedeni biz StreamMoelBu ortamı düzMap içinde kullanamazsınız. Anlayışım doğru mu? –