Creating Datasets
10 September 2019
데이터 세트는 RDD와 유사하지만, 데이터 세트는 Java 직렬화 또는 Kryo 대신 특수 인코더를 사용하여 객체를 직렬화하고 네트워크를 통해 전송합니다.
인코더와 표준적인 직렬화는 둘다 객체를 바이트로 변환하는 역할을 합니다.
하지만 인코더는 코드를 동적으로 생성하며 Spark가 바이트를 객체로 역직렬화하지 않고 필터링, 정렬 및 해싱과 같은 많은 작업을 수행 할 수있는 형식을 사용합니다.
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder personEncoder = Encoders.bean(Person.class);
Dataset javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
https://spark.apache.org/docs/latest/sql-getting-started.html#creating-datasets