본문 바로가기

TIL

[240319] 스파크(spark): 배경 및 기본 구조, 기능

[spark를 이용한 빅데이터 분석 by 안창배 튜터]

 

[1] 스파크(spark): 컴퓨터와 데이터, 메모리, 클라우드 

[2] 스파크(spark): 병렬/분산처리, 샘플링, 분할, Dask, 자동화

 

1. Spark란?

1) 스파크 등장 배경
- 과거엔 대규모 연산 및 데이터 처리를
프로세서 성능 향상에 맡겼으나, 물리적 방열 등 단일 CPU 기능 개선에는 한계가 있었음 

- 하여, 모든 코어가 같은 속도로 동작하는 병렬 CPU 코어를 더 많이 추가하는 방향으로 발전 선회

- 데이터 수집 비용이 저렴해져 데이터 클러스터에서 처리해야 할 만큼 거대해짐 

2) 스파크의 역사

- 요약: 스파크는 많은 변화를 겪어 왔으며, 최소 2022년 이후 자료를 살펴보는 것을 추천 

- UC 버클리 대학교에서 2009년 스파크 연구 프로젝트로 시작
- 1.0 이전 스파크 초기 버전은 함수형 연산 관점에서 API를 정의
- 스파크 1.0 버전부터 구조화된 데이터를 기반으로 동작하는 신규 API인 스파크 SQL 추가
- 이후 DataFrame, 머신러닝 파이프라인, 자동 최적화를 수행하는 구조의 스트리밍 등 더 강력한 구조체 기반 신규 API 추가


3) 아파치 스파크의 철학
- 통합: 데이터 읽기부터 SQL/머신러닝/스트림까지, 같은 연산 엔진과 일관성 있는 API로 수행 가능하도록 설계 (하나의 프레임워크)

- 컴퓨팅 엔진: 데이터를 연산하는 역할만 수행할 뿐 영구 저장소 역할은 수행하지 않음 (저장소 연동 필요)

- 라이브러리: 엔진에서 제공하는 표준 라이브러리와 오픈 소스(서드파트 패키지 형태)로 제공하는 다양한 외부 라이브러리 지원


2. How Spark Works?

1) 스파크의 기본 구조

①  Driver와 Executor

- 관리자인 Driver와 일꾼인 Executor로 구성되며, Cluster Manager가 전반적인 과정을 다 통제함 

└ Driver가 일을 나눠주는 역할을 하는데, 스파크 언어 API를 통해 다양한 언어(Pyspark, Scala 등)로 표현

 실제 일하는 것은  Executor로, 내부 스파크 코드를 수행함

 

② JVM와 Scala, Pyspark


- Spark는 Java를 기반으로 개발되어 JVM(Java Virtual Machine)을 통해 구동

단, 파이썬 코드와 통용되는 Pyspark로도 어지간한 것은 감당 가능해 Java와 Scala까지는 안 가도 됨 

 

③ 분산 처리와 Partition
- Partition이란, 모든 executor가 병렬로 일할 수 있도록 데이터를 쪼갠 단위

 └ 다른 말로 하면, 클러스터의 하나의 물리적 시스템에 있는 row의 모음 (전체 데이터 쪼개놓은 것 중 하나)

 └어느 데이터가 어느 파티션에 들어가도록 직접 조작은 불가하나, 하이레벨 설계만 가능(실행은 스파크가 알아서) 

 └ 특정 Partition의 메모리 부하가 커지면 OOM으로 스파크가 종료될 수 있음

- Repartition을 통해 파티션 재구성 가능하며, 적절한 Repartition으로 밸런스 조정

- Partition, Repartition 사이의 데이터 통신은 네트워크 통신이므로, 비싼 연산임(ex. 정렬)
 └ 가능하면 partition 내부에서 연산이 가능하도록 하는 것이 좋음


2) 스파크의 특징 


Transformation: 게으른 컴퓨터(Lazy computation)
- Spark는 바로바로 주어진 변환Transformation을 수행하지 않음

- 일을 미루고 여러 변환들을 차곡차곡 쌓아둔 뒤, 한 번에 최적화 수행 

 

② Action: 리니지(Lineage)와 Optimization

- 변환이 필요한 일을 수행하도록 구동하는 작업이 Action

 count, collect, take, top, show, write, toDF 등

- 이때 스파크가 의도적으로 미루어둔 쌓인 일은 Lineage로, 전체 Lineage를 보고 필요 없는 연산은 제거

- Lineage로 병목현상이나 OOM이 확인되기도 함 


③ RDD, Dataset, DataFrame


- 2014년 정식 출시된 Spark v1은 RDD API를 이용하여 데이터를 처리했지만, 불편함이 많았음 
- Spark 1.3d에서 처리 속도 증가를 위해 DataFrame이 소개되었으며, 현재 PySpark는 대부분 이를 기반으로 함

- Spark 1.6에서 Dataset이 추가되었으며 다양한 용도가 있음 


3) Spark 작업의 수행 과정


① DataFrame 등을 이용해 코드 작성
② 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환
   └ Driver와 Execute의 정보를 고려하지 않고, 논리적인 가능성만 참조
③ 논리적 실행 계획을 물리적 실행 계획으로 변환

   └ 변환 과정에서 테이블의 크기와 파티션 수 등을 고려해 추가 최적화 가능 여부 확인
④ 클러스터에서 물리적 실행 계획(RDD 처리)을 실행

4) Spark 데이터 타입
- Pandas와는 미묘하게 다르지만, 유사

5) 스파크 대시보드

 

 

3. 스파크 꿀팁 기능

1) 브로드 캐스트

브로드캐스트(좌)와 어큐물레이터(우)


- 모든 Worker에게 큰 규모의 입력 데이터셋을 효율적으로 제공할 때 사용하는 방법
- 메모리를 효율적으로 사용하는 방법론 중 하나

- Spark에서 브로드캐스트는 모든 Executor가 접근 가능한 공통의 변수 

 └ spark.sparkContext.broadcast()

 └ 너무 큰 데이터는 부하가 심해질 수 있음 

- 모든 Executor가 공유하되 반대로 그 값이 가변하는 값은 어큐물레이터

(cf) 브로드 캐스트는 numpy나 pandas에서도 존재하는 개념 ex) df['num'] + 1


2) Join
- 두 개의 테이블을 병합하는 것으로 pandas의 merge 같은 기능 

- 테이블 크기에 따라 전략이 다르나 결국 클러스터 간 통신을 하느냐, 마느냐의 문제

- Big to Big join: 셔플 조인이라고도 부르며, 두 개의 큰 테이블을 병합

모든 Executor가 다른 Executor와 통신
데이터가 어떻게 분할되어있느냐에 따라 혼잡도가 달라짐


- Big to Small join: 하나의 큰 테이블과 다른 작은 테이블을 병합
└ 작은 테이블을 복사 떠서 모든 Executor에게 전송 
처음에 대규모 통신을 하고, 이후 노드간 통신이 필요없어 보통 더 빠른 작업이 가능 


- Small to Small join: 2개의 작은 테이블 병합. 시간도 오래 안걸리고 공간 차지도 적음 
- 기타 Join 팁
미리 데이터가 잘 분할되어 있다면, 셔플 조인도 빠르게 수행 가능 

어떤 Join은 Filter의 역할을 수행하기 때문에, Join 순서에 따라 속도가 달라질 수도 있음

 

3) 고급 I/O
- Inferschema: 데이터 타입을 자동으로 추론하는 기능

└ 다소 느리고 의도하지 않은 타입이 할당되기도 하므로 아예 스키마를 강제하는 것도 좋은 방법 
-  Write options
mode: 'overwrite'라고 명시하지 않으면, 기존의 파일이 있을 때 에러가 남 
partitionBy: 어떤 칼럼을 기준으로 데이터를 분할해 저장할지 명시
maxPartitionBytes: 분할된 파일의 최대 크기를 조정

- 파일 크기 관리가 굉장히 중요한데, 일반적으로 500MB ~ 2GB 정도가 적절

은 파일을 많이 생성하면 메타 데이터 관리에 부하 발생

파일이 커지면 일부 row만 필요하더라도 전체 데이터 블록 읽어야 하기 때문에 비효율적

- 여러 익스큐터가 같은 파일을 동시에 읽을 수 없지만, 여러 파일을 동시에 읽을 수는 있음 
쓰기 작업 전 파티션 수를 조절로 병렬 처리 파일 수 제어 가능 
- 복합 데이터 유형(nested)은 모든 파일 포맷에서 적합하지 않으므로 비추천