On the journey of

[PySPARK] Spark의 정형 API (1) 본문

Experiences & Study/PySPARK & Data Engineering

[PySPARK] Spark의 정형 API (1)

dlrpskdi 2023. 8. 31. 00:06

3장 Apache Spark’s Structured APIs

소개글

  • Apache Spark가 어떻게 생겨났고, Spark의 버전들의 특성을 간략하게 소개하고 있다.
  • 새로운 API를 살펴보기 전에 RDD API 모델을 살펴보자고 한다.
1. Spark: What’s Underneath an RDD?

RDD는 스파크에서 가장 기초적인 추상화이고, RDD에는 3가지의 중요한 특성이 있다

  1. 종속성 (Dependencies)
  2. 파티션 (Partitions)
  3. 계산 기능 (Compute function) => Iterator[T]

3가지 모두 간단한 RDD 프로그래밍 API 모델에 필수적이다.

  • 종속성

스파크에 입력으로 RDD를 구성하는 방법을 지시하는 종속성 목록이 필요하다.

결과를 재현하기 위해 필요한 경우 Spark는 이러한 종속성에서 RDD를 재생성하고 RDD에 대한 작업을 복제할 수 있습니다. (이러한 특징은 RDD의 탄력성을 부여한다)

  • 파티션

파티션은 Spark에 작업을 분할하여 실행자 간에 파티션의 계산을 병렬화 할 수 있는 기능을 제공한다. 스파크는 로컬리티 정보를 사용하여 데이터에 가까운 실행자에게 작업을 보낸다.

그렇게 하면 네트워크를 통해 전송되는 데이터가 줄어든다.

  • 계산 기능

RDD 계산 기능은 RDD에 데이터가 저장될 수 있도록 Iterator 기능을 제공한다.

이런 RDD에 몇 가지 문제점이 있는데,

  • 계산 기능이 불명확하다 (Spark는 오직 lambda 표현만 본다)
  • Iterator[T] 데이터 타입 또한 불명확하다 (Spark는 Python에 있는 일반 개체라는 것만 안다)
  • 의도를 전혀 이해하지 못한다
  • Spark는 T에 있는 특정 데이터 타입의 지식이 없다 (특정 column 개체 접근 할 수 없음)

⇒ 이러한 불투명성은 Spark가 효율적인 query문을 작성하는데 방해가 된다.

2. Structuring Spark
  • Clarity and Simplicity (명료함과 단순성)

Spark 2.x는 Spark를 구성하는 몇 개의 key 스키마로 소개되는데, 데이터 분석에서 패턴을 사용하면서 발견되는 계산은 filtering, selecting, counting.. 과같은 high level 작업으로 표현된다.

  • Specificity (구체성)

DSL 연산 세트를 통하여, Spark's에 제공된 언어(Java, Python, Spark, R, and SQL)에서 API를 사용 가능하게 된다.

이 연산자를 사용하면 Spark에 데이터로 무엇을 계산하고 싶은지 알릴 수 있으며, 결과적으로 실행을 위한 효율적인 쿼리를 구성할 수 있다.

명령과 구조의 마지막 스키마는 데이터가 SQL 테이블이나 스프레드시트 같은 형식으로 정렬될 수 있도록 해준다.

3. Key Merits and Benefits
  • Structure는 스파크 구성 요소 전반에 걸쳐 향상된 성능과 공간 효율성을 포함한 여러 가지 이점을 제공한다 (표현성, 단순성, 구성성 및 일관성)
  • 표현성과 구성성이 있는 코드 (각 이름에 대한 모든 연령을 다음 기준으로 집계)
  •  

⇒ 스파크에게 일련의 람다 함수를 사용하여 키를 집계하고 평균을 계산하는 방법이 암호화되어 있고 읽기 어렵다

  • 표현력이 뛰어나고 단순한 코드 (높은 수준의 DSL 연산자와 DataFrame API를 사용하여 동일한 쿼리를 표현한 코드)
  •  

  1. 사람들의 이름으로 그룹화하고
  2. 나이를 집계한 다음
  3. 같은 이름을 가진 모든 사람들의 평균 나이 계산

⇒ 쿼리를 구성하기 위해 이러한 연산자를 사용함

⇒ 고급 연산자를 단일 단순 쿼리로 사용하여 전체 계산을 구성

고수준 DSL연산자들과 데이터프레임 API를 쓴다면?

  • 스파크에게 무엇을 할지 알려주므로 표현력이 높으며 간단하다.
  • 스파크는 이런 쿼리를 파악해 사용자의 의도를 이해할 수 있기에 연삭 최적화가 가능하다.
  • 스파크 상위 수준 API는 컴포넌트들과 언어를 통틀어 일관성을 갖는다.
    • ex. 아래 스칼라 코드는 앞의 파이썬 코드와 같은 일을 하면서 형태도 비슷하다.
    •  

  • 이러한 단순성이나 표현력을 상위 수준 구조화 API 위에 구축된 스파크 SQL 엔진 덕택에 가능한 것이다.
  • 추후 스파크 SQL 엔진에 대해 살펴볼 예정이다.(해당 과정에서는 자주 쓰이는 연산을 위한 API와 DSL이 분석에 어떻게 쓰이는지 살펴본다.)
데이터 프레임 API
  • 스파크 데이터프레임
    • 구조, 포맷 등 특정 연산 등에 있어 판다스 데이터 프레임의 영향을 받았다.
    • 이름 있는 컬럼과 스키마를 가진 분산 인메모리 테이블처럼 동작한다.
    • integer, string, array, real, data, timestamp 등의 데이터 타입을 가질 수 있다.
    • ex. 데이터 프레임 표 형태 포맷

스파크의 기본 데이터 타입
  • 스파크는 지원 프로그래밍 언어에 맞게 기본적인 내부 데이터 타입을 지원한다.
    • ex. In 스칼라, 어떤 컬럼 이름이 String, Byte 등 타입 중 하나가 되도록 선언할 수 있다.
  • 스파크에서 지원하는 기본 데이터 타입 (파이썬)

스파크의 파이썬 정형화 데이터 타입

데이터타입 스칼라에서 할당되는 값 초기 생성 API

BinaryType bytearry BinaryType()
TimestampType datatime.datatime TimestampType()
DataType datatime.data Datatype()
ArrayType list,tuple,array 중 ArrayType(datatype,[nullable])
MapType dict MapType(keyType, valueType, [nullable])
StructType list 혹은 tuple StructType([fileds])
StructField 해당 필드와 맞는 값의 타입 StructField(name,dataType,[nullable])

스파크에서 스키마 = 데이터 프레임을 위해 칼럼이름 과 연관된 데이터 타입을 정의한 것.

: 외부 데이터 소스에서 구조화된 데이터를 읽어 들일 때 쓰이게 됨.

데이터 소스에서 큰 파일을 읽어야 한다면 가능한 한 스키마를 미리 정의해주는 것이 좋다.

( 왜냐하면 스파크가 데이터 타입을 추측해야 하는 책임을 덜어주고, 스키마를 확정하기 위해 별도의 잡을 만드는 것을 방지하고(비용과 시간의 절약), 데이터가 스키마와 맞지 않는 경우 문제를 조기에 발견 할 수 있음)

스키마를 정의하는 두 가지 방법
  1. 프로그래밍 스타일
schema=StructType([StructField("author",StringType(),False),
StructField("title",StringType(),False),
StructField("pages",IntegerType(),False)])

2. DDL

schema=" 'author' STRING, 'title' STRING, 'pages' INT”
JSON 파일에서 데이터 불러오기

1. blogs.json

{"Id":1, "First": "Jules", "Last":"Damji", "Url":"https://tinyurl.1", "Published":"1/4/2016", "Hits": 4535, "Campaigns": ["twitter", "LinkedIn"]}
{"Id":2, "First": "Brooke","Last": "Wenig","Url": "https://tinyurl.2", "Published": "5/5/2018", "Hits":8908, "Campaigns": ["twitter", "LinkedIn"]}
{"Id": 3, "First": "Denny", "Last": "Lee", "Url": "https://tinyurl.3","Published": "6/7/2019","Hits": 7659, "Campaigns": ["web", "twitter", "FB", "LinkedIn"]}
{"Id": 4, "First":"Tathagata", "Last": "Das","Url": "https://tinyurl.4", "Published": "5/12/2018", "Hits": 10568, "Campaigns": ["twitter", "FB"]}
{"Id": 5, "First": "Matei","Last": "Zaharia","Url": "https://tinyurl.5", "Published": "5/14/2014", "Hits": 40578, "Campaigns": ["web", "twitter", "FB", "LinkedIn"]}
{"Id":6,  "First": "Reynold", "Last": "Xin", "Url": "https://tinyurl.6", "Published": "3/2/2015", "Hits": 25568, "Campaigns": ["twitter", "LinkedIn"] }

2. 예제 schema

blogs_df = spark.read.schema(schema).json("blogs.json")

blogs_df.show()

print(blogs_df.printSchema())

print(blogs_df.schema)

칼럼과 표현식

칼럼

  • 어떤 특정한 타입의 필드를 나타내는 개념
    • 개념적으로 판다스나 R에서의 데이터 프레임이나 RDBMS 테이블의 칼럼과 유사
  • 사용자는 이름으로 칼럼을 나열해 볼 수도 있고, 관계형 표현이나 계산식 형태의 표현식으로 그 값들에 연산을 수행할 수 있음
  • public 메소드를 가진 객체로 표현 (칼럼 타입)
  • 논리식이나 수학 표현식도 사용 가능
    • expr(”columnName * 5”) (columnName이 integer, string 등의 타입일 경우)
    • (expr(”columnName - 5”) > col(anothercolumnName))
  • column으로 가능한 예제
# col으로 특정 칼럼에 접근하면 Column 타입을 되돌려 준다.
print(blogs_df["Id"])

# 값 계산을 위한 표현식 사용
blogs_df.select(expr("Hits * 2")).show(2)
# 혹은 col을 사용한 계산
blogs_df.select(col("Hits") * 2).show(2)

# 블로그 우수 방문자를 계산하기 위한 식 표현
# 이 코드는 뒤의 식에 맞는 값으로 "Big Hitters"라는 이름의 새로운 칼럼을 추가한다.
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

# 새 칼럼을 연결하여 새로운 칼럼을 만들고 그 칼럼을 보여준다.
blogs_df.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id")))).select(col("AuthorsId")).show(4)

# 이 문장들은 모두 동일한 결과를 보여주며 표현만 약간씩 다르다.
blogs_df.select(expr("Hits")).show(2)
blogs_df.select(col("Hits")).show(2)
blogs_df.select("Hits").show(2)

# "Id" 칼럼값에 따라 역순으로 정렬한다.
blogs_df.sort(col("Id").desc()).show()

Column 객체는 단독으로 존재할 수 없음

→ 각 칼럼은 한 레코드의 로우의 일부분이며, 모든 로우가 합쳐져서 하나의 데이터 프레임을 구성

로우

하나의 행은 하나 이상의 동일한 타입 or 다른 타입의 칼럼을 갖고 있는 로우(row) 객체로 표현되고, 인덱스는 0부터 시작한다.

Row 객체들은 빠른 탐색을 위해 데이터프레임으로 만들어서 사용되기도 한다.

데이터 프레임을 읽어들이는게 더 일반적이며 스키마를 이용하는 것이 데이터 프레임 작성에 더 빠르고 효율적이다.

자주 쓰이는 데이터 프레임 작업들

DataFrameReader

JSON, CSV, 파케이(Parquet), 텍스트, 에이브로, ORC 등의 포멧을 데이터 소스에서 데이터 프레임으로 가지고 온다.

DataFrameWriter

동일하게 특정 포멧의 데이터 소스에서 데이터프레임으로 써서 내보낸다.

DataFrameReader와 DataFrameWriter 사용하기

고수준의 추상화 및 NoSQL, RDBMS, 아파치 카프카, 키네시스 등의 커뮤니티의 공헌으로 스파크에서 읽고 쓰는 작업은 쉬운편이다. 큰 데이터 프레임은 스키마를 지정하고 DataFrameReader 클래스를 통해서 읽는것이 효과적이다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Chapter3").getOrCreate()

from pyspark.sql.types import *

# define schema for our data
fire_schema = StructType([
		StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True), 
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

sf_frie_file = '/Users/User/vscode/Spark/spark-3.3.2-bin-hadoop3/data/sf-fire-calls.csv' 
fire_df = spark.read.csv(sf_frie_file, header=True, schema=fire_schema)
fire_df.show() # 11.3s

spark.read.csv() 함수는 CSV파일을 읽어서 row객체와 스키마에 맞는 타입의 이름이 있는 칼럼들로 이루어진 데이터 프레임으로 되돌려준다.

DataFrameWriter으로 원하는 포맷으로 쓸 수 있다. 기본 포멧은 파케이고 스내피로 압축한다.

파케이로 쓰였다면 스키마는 파케이 메타데이터의 일부로 보존이 가능하며 수동으로 스키마를 적용할 필요가 없어진다.