from csv to kafka by spark

28 March 2020

Spark를 통해 csv파일을 읽고, 해당 내용을 다시 kafka로 write하는 예제를 알아보도록 하겠습니다.
먼저 아래와 같이 sbt 파일에 dependency를 추가해주도록 합니다.
- build.sbt

name := "spark_kafka"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5"
그리고 아래와 같이 소스코드를 작성해줍니다.
package org.shashaka.io

import org.apache.spark.sql.types.{MapType, StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaProducer {
  def main(args: Array[String]) {

    System.setProperty("hadoop.home.dir", "C:\\hadoop")

    val conf = new SparkConf().setAppName("kafkaWriter").setMaster("local")

    val sc = new SparkContext(conf)

    val sqlContext = SparkSession.builder().config(conf).getOrCreate()

    val fileRows = sc.textFile("C:\\test.csv")

    val schema = new StructType()
      .add("name", StringType)
      .add("duration", StringType)
      .add("client_add", StringType)
      .add("result_code", StringType)
      .add("bytes", MapType(StringType, StringType))

    val rowRDD = fileRows.map(_.split(",")).map(x => Row(x(0), x(1), x(2), x(3), Map("test" -> x(4))))

    val squidDF = sqlContext.createDataFrame(rowRDD, schema).toJSON

    squidDF
      .repartition(4)
      .write
      .format("kafka")
      .option("topic", "test")
      .option("kafka.bootstrap.servers", "kafka_host:9092")
      .save()

    sc.stop()
  }
}


아래와 같이 csv파일을 읽어, kafka에 json 형태로 출력되는 것을 확인할 수 있습니다.

//test_location.csv
Sally Whittaker,2018,McCarren House,312,3.75
Belinda Jameson,2017,Cushing House,148,3.52
Jeff Smith,2018,Prescott House,17-D,3.20
Sandy Allen,2019,Oliver House,108,3.48


// Kafka output
{"time":"Sandy Allen","duration":"2019","client_add":"Oliver House","result_code":"108","bytes":"3.48"}
{"time":"Belinda Jameson","duration":"2017","client_add":"Cushing House","result_code":"148","bytes":"3.52"}
{"time":"Sally Whittaker","duration":"2018","client_add":"McCarren House","result_code":"312","bytes":"3.75"}
{"time":"Jeff Smith","duration":"2018","client_add":"Prescott House","result_code":"17-D","bytes":"3.20"}
설정은 서버의 용도에 따라 변경해주도록 합니다.