from csv to kafka by spark
28 March 2020
Spark를 통해 csv파일을 읽고, 해당 내용을 다시 kafka로 write하는 예제를 알아보도록 하겠습니다.
먼저 아래와 같이 sbt 파일에 dependency를 추가해주도록 합니다.
- build.sbt
name := "spark_kafka"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5"
그리고 아래와 같이 소스코드를 작성해줍니다.
package org.shashaka.io
import org.apache.spark.sql.types.{MapType, StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafkaProducer {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "C:\\hadoop")
val conf = new SparkConf().setAppName("kafkaWriter").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = SparkSession.builder().config(conf).getOrCreate()
val fileRows = sc.textFile("C:\\test.csv")
val schema = new StructType()
.add("name", StringType)
.add("duration", StringType)
.add("client_add", StringType)
.add("result_code", StringType)
.add("bytes", MapType(StringType, StringType))
val rowRDD = fileRows.map(_.split(",")).map(x => Row(x(0), x(1), x(2), x(3), Map("test" -> x(4))))
val squidDF = sqlContext.createDataFrame(rowRDD, schema).toJSON
squidDF
.repartition(4)
.write
.format("kafka")
.option("topic", "test")
.option("kafka.bootstrap.servers", "kafka_host:9092")
.save()
sc.stop()
}
}
아래와 같이 csv파일을 읽어, kafka에 json 형태로 출력되는 것을 확인할 수 있습니다.
//test_location.csv
Sally Whittaker,2018,McCarren House,312,3.75
Belinda Jameson,2017,Cushing House,148,3.52
Jeff Smith,2018,Prescott House,17-D,3.20
Sandy Allen,2019,Oliver House,108,3.48
// Kafka output
{"time":"Sandy Allen","duration":"2019","client_add":"Oliver House","result_code":"108","bytes":"3.48"}
{"time":"Belinda Jameson","duration":"2017","client_add":"Cushing House","result_code":"148","bytes":"3.52"}
{"time":"Sally Whittaker","duration":"2018","client_add":"McCarren House","result_code":"312","bytes":"3.75"}
{"time":"Jeff Smith","duration":"2018","client_add":"Prescott House","result_code":"17-D","bytes":"3.20"}
설정은 서버의 용도에 따라 변경해주도록 합니다.