6

Kafka'dan bir veri akışı yüklemek için DataFrame/Dataset API'sine dayalı Spark-Streaming'i kullanarak structured streaming approach'u kullanmaya çalışıyorum.Yapılandırılmış Akış kullanarak Kafka'dan JSON biçiminde kayıtlar nasıl okunur?

kullanmak:

  • Kıvılcım 2.10
  • Kafka'nın 0.10
  • kıvılcım SQL-kafka-0-10

Kıvılcım Kafka'nın DataSource tanımlamıştır altta yatan şema:

|key|value|topic|partition|offset|timestamp|timestampType| 

Verilerim json biçiminde gelir ve sütununda saklanırlar. Değer sütunundan temel alınan şemayı ayıklamak ve alınan veri çerçevesini değerinde depolanan sütunlara güncelleştirme yöntemini nasıl araştırıyorum? Aşağıda yaklaşım denedi ama çalışmıyor:

Burada çünkü akışın yaratılış zamanında İstisna org.apache.spark.sql.AnalysisException: Can't extract value from value#337; alıyorum
val columns = Array("column1", "column2") // column names 
val rawKafkaDF = sparkSession.sqlContext.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers","localhost:9092") 
    .option("subscribe",topic) 
    .load() 
    val columnsToSelect = columns.map(x => new Column("value." + x)) 
    val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) 

    // some analytics using stream dataframe kafkaDF 

    val query = kafkaDF.writeStream.format("console").start() 
    query.awaitTermination() 

, içeride bilinmeyen değerler ...

Herhangi bir öneriniz var mı ?

cevap

6

Spark açısından value sadece bir bayt dizisidir. Serileştirme formatı veya içeriği hakkında bilgisi yoktur. Dosyayı ayıklamak için önce onu ayrıştırmanız gerekir.

Veriler JSON dizesi olarak serileştirilmişse, iki seçeneğiniz vardır. , Yol ile özü alanları get_json_object kullanılarak

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.from_json 

val schema: StructType = StructType(Seq(
    StructField("column1", ???), 
    StructField("column2", ???) 
)) 

rawKafkaDF.select(from_json($"value".cast(StringType), schema)) 

veya StringType için cast: Sen StringType için castvalue ve from_json kullanmak ve bir şema sağlayabilir

import org.apache.spark.sql.functions.get_json_object 

val columns: Seq[String] = ??? 

val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) 

rawKafkaDF.select(exprs: _*) 

ve üstü istenen türlerine cast.