spark로 cassandra table scan

10 November 2019

spark와 cassandra가 서로 연동되어 있는 상태에서,
spark를 통해서 cassandra에서 scan하는 예제를 알아보도록 하겠습니다.

먼저 아래와 같이 intellij에서 scala project를 만들어주고 sbt를 통해 dependency를 설정해주도록 합니다.

name := "scalaCassandra"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.4",
  "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.1",
  "org.apache.spark" %% "spark-core" % "2.4.4"
)
이후, main 함수에서 아래와 같이 cassandra와 연동할 수 있게 설정해주고,해당 table의 값을 scan해오도록 합니다.
package org.shashaka.test

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}

object SparkApp extends App {

  override def main(args: Array[String]) {
    val conf = new SparkConf(true)
      .set("spark.cassandra.connection.host", "cassandra_ip")
      .set("spark.cassandra.auth.username", "cassandra")
      .set("spark.cassandra.auth.password", "cassandra")

    val sc = new SparkContext("local", "app_name", conf)

    val rdd = toSparkContextFunctions(sc).cassandraTable("keyspace_name", "table_name")

    val userMap = rdd.first().get[Map[Int, Int]]("rank");
    println("userMap : " + userMap)

    sc.stop()
  }
}