spark로 cassandra insert

09 November 2019

spark와 cassandra가 서로 연동되어 있는 상태에서,
spark를 통해서 cassandra에 insert 하는 예제를 알아보도록 하겠습니다.

먼저 아래와 같이 keyspace를 만들어주고,
현재 빌보드를 참고하여 10곡의 노래를 cassandra에 insert구문을 통해 넣어주도록 합니다.

CREATE TABLE test.kv(key text PRIMARY KEY, value int);

CREATE TABLE test.songs (
    id int PRIMARY KEY,
    name text
);

insert into songs(id, name) values (1,'Lose You To Love Me');
insert into songs(id, name) values (2,'Someone You Loved');
insert into songs(id, name) values (3,'Circles');
insert into songs(id, name) values (4,'Senorita');
insert into songs(id, name) values (5,'Truth Hurts');
insert into songs(id, name) values (6,'Good As Hell');
insert into songs(id, name) values (7,'Follow God');
insert into songs(id, name) values (8,'No Guidance');
insert into songs(id, name) values (9,'Panini');
insert into songs(id, name) values (10,'Bad Guy');
하지만, spark는 공식 이미지가 없으니, openjdk:8-alpine을 통해서 만들어주도록 합니다.
아래의 url은 이후에는 바뀔 수 있으므로, spark 다운로드 사이트에서 url을 체크해 적용해주도록 합니다.
또한, openjdk 이미지에는 bash가 설치되어 있지 않으니, 아래와 같이 apk add를 통해 설치를 해줍니다.
spark-shell을 실행시키기 위해서 bash가 필요합니다.

아래와 같이 user와 해당 user가 선호하는 곡의 평점을 매칭시킬 수 있도록,
user_rankings 테이블을 만들어주도록 합니다.
CREATE TABLE test.user_rankings (
    id int PRIMARY KEY,
    rank map
);
이후, 아래와 같이 spark scala shell에서 random 함수를 이용하여,
1000명의 user가 선호하는 노래의 평점을 random으로 넣어주도록 합니다.
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import scala.collection.mutable._

val rdd = sc.cassandraTable("test", "user_rankings")

for (a <- 1 to 1000) {
  val map = new HashMap[Integer, Integer];
  val random = new scala.util.Random();
  
  for (i <- 1 to 10) {
  val id = random.nextInt(10) + 1
  val value = random.nextInt(10) + 1;
    map.put(id, value);
  }
  
  val collection = sc.parallelize(Seq((a, map)))
collection.saveToCassandra("test", "user_rankings", SomeColumns("id", "rank"))       
}