Interoperating with RDDs

10 September 2019

Spark SQL은 기존 RDD를 데이터 세트로 변환하는 두 가지 방법을 지원합니다.
첫 번째 방법은 리플렉션을 사용하여 특정 유형의 객체를 포함하는 RDD의 스키마를 유추합니다.
이 리플렉션 기반 접근 방식은 보다 간결한 코드로 연결되며 Spark 애플리케이션을 작성하는 동안 이미 스키마를 알고있을 때 잘 작동합니다.

데이터 집합을 만드는 두 번째 방법은 스키마를 구성한 다음 기존 RDD에 적용 할 수 있는 프로그래밍 인터페이스를 사용하는 것입니다.
이 방법은 더 장황하지만 런타임까지 열과 해당 유형을 알 수 없는 경우에도 데이터 집합을 구성 할 수 있습니다.

Inferring the Schema Using Reflection


Spark SQL은 JavaBeans RDD를 DataFrame으로 자동 변환하는 기능을 지원합니다.
리플렉션을 사용하여 얻은 BeanInfo는 테이블의 스키마를 정의합니다.
현재 Spark SQL은 맵 필드가 포함 된 JavaBean은 지원하지 않습니다.
하지만 JavaBeans의 List와 Array 필드는 지원됩니다.
Serializable을 구현하고 모든 필드에 대해 getter 및 setter를 갖는 클래스를 작성하여 JavaBean을 작성할 수 있습니다.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
    stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

Programmatically Specifying the Schema


만약 javaBean 클래스가 수행 전에 정의될 수 없는 경우, Dataset<Row>는 세가지 스텝의 프로그래밍 방식으로 생성될 수 있습니다.

1. 기존 RDD에서 Row의 RDD를 작성하십시오.
2. 1 단계에서 작성된 RDD의 행 구조와 일치하는 StructType으로 표시되는 스키마를 작성하십시오.
3. SparkSession에서 제공하는 createDataFrame 메소드를 통해 Row의 RDD에 스키마를 적용하십시오.

예를 들어 :
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

https://spark.apache.org/docs/latest/sql-getting-started.html#interoperating-with-rdds

이 문서는 개인적인 목적이나 배포하기 위해서 복사할 수 있다. 출력물이든 디지털 문서든 각 복사본에 어떤 비용도 청구할 수 없고 모든 복사본에는 이 카피라이트 문구가 있어야 한다.