bir akış veri kaynağı oluşturmak için:Apache Flink veri akışından kullanım değerleri dinamik olarak ben aşağıdakileri yapar Apache FLINK kullanarak örnek bir uygulama oluşturmak çalışıyorum
- stok sembollerin bir demetini okur (örn Kafka kuyruğundan 'CSCO', 'FB').
- Her sembol için, geçerli fiyatların gerçek zamanlı olarak araştırılması gerçekleştirilir ve akış aşağı işlem için değerleri akar.
* Orijinal yayına *
güncelleme Ben ayrı sınıfa haritası işlevini taşındı ve çalışma zamanı hata mesajı "MapFunction uygulanması artık seri hale getirilebilir değil alamadım. Nesne muhtemelen seri hale getirilemeyen alanlar "içeriyor veya başvuruyor".
Şu anda karşı karşıya olduğum konu Kafka konusundaki "stok fiyatlarının" fiyatları almaya çalışmadığına inanıyorum. Sorun çekmeye çalışıyorum ve herhangi bir güncelleme yayınlayacağım.
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event from the window...all elements are the same "Symbol"
.apply(new WindowFunction<String, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
out.collect(input.iterator().next().toString());
}
})
.map(new StockSymbolToPriceMapFunction());
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);
DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));
return "100000";
}
private static class CustomKeySelector implements KeySelector<String, String> {
@Override
public String getKey(String arg0) throws Exception {
return arg0.trim();
}
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
String stockPrice = "0";
while (isRunning) {
//TODO: query Google Finance API
stockPrice = Integer.toString((new Random()).nextInt(100)+1);
ctx.collect(stockPrice);
Thread.sleep(10000);
}
}
}
Gelen verilere yanıt olarak dinamik olarak akış oluşturmak mümkün mü? –
Dinamik olarak gelen kayıtları temel alan verileri okuyan ve yayan bir "FlatMapFunction" uygulayabilirsiniz. Örneğin, dosya adlarına sahip bir akışınız varsa, bir 'FlatMapFunction' bu dosyaları açabilir ve verilerini yayınlayabilirsiniz. Bununla birlikte, tüm kayıtların çıktı türleri aynı olmalıdır. Ayrıca, olay zamanı işlem semantiğinin doğru olması zor olabilir, ancak bu dinamik olarak eklenen kaynakların genel bir sorunudur. –
@FabianHueske Benzer bir kullanım durumunu çözüyorum. Yani eğer FlatMapFunction kullanmam gerekiyorsa, dosyayı normal dosya API'sini kullanarak scala/Java kullanarak ve Flink'in readTextFile'ını kullanmadan okumak zorundayız. Nedeni biz StreamMoelBu ortamı düzMap içinde kullanamazsınız. Anlayışım doğru mu? –