[spark를 이용한 빅데이터 분석 by 안창배 튜터]
[1] 스파크(spark): 컴퓨터와 데이터, 메모리, 클라우드
1. 병렬 처리
1) 병렬 처리란?
- 여러 개의 코어에서 동시 다발적으로 일을 나누어서 수행하는 것이 병렬처리
- 주로 단일 컴퓨터(머신) 내의 여러 코어나 프로세서를 사용하여 작업을 동시에 수행
- 파이썬은 기본적으로 1개의 코어를 사용하나 특정 명령어 사용 시 여러 개 코어 사용 가능
└ 파이썬에서는 Multiprocessing이나 joblib이 흔히 사용
└ sklearn의 njobs가 코어 개수를 제어하는 기능
2) 분산 처리
- 네트워크를 통해 연결된 여러 대의 컴퓨터(노드)가 각각의 작업을 수행하는 것
- Spark를 포함해 Ray, Dask 등의 라이브러리 사용 가능
🖥️ CPU vs GPU
- 스파크는 기본적으로 CPU를 사용
- LLM의 발달로 GPU에 대한 수요가 크게 늘었고, 스파크도 GPU 기반 병렬처리를 지원(단, 별도 영역)
3) 병렬 처리 vs. 벡터화(Vectorize)
- 전처리 단계에서 벡터화(Vectorize) 여부에 따라 수십 수백배 이상 속도 차이 발생
- 한 기기의 메모리 안에서 처리가 가능한 규모라면, Spark보다는 연산 최적화하는 것을 권장
- Numpy(Pandas 포함)는 벡터화 연산을 통해 단순 반복작업을 굉장히 빠르게 처리
└ 특히 for문의 순회를 최적화하는데 많이 사용
└ 개별 머신은 벡터화를 수행하여 속도 최적화
4) 병렬처리의 기술 (참고)
- Pickling과 Job broker
└ Pickling: 스파크 포함 분산/병렬처리 도구는 내부에서 데이터를 Picklie로 변환하여 송수신 함
└ Job broke: 분산/병렬 처리시 어떤 프로세서에 어떤 데이터를 보내고, 어떤 작업을, 어떻게 취합할지 관리하는 매니저
· 스파크에서는 클러스터 매니저Cluster Manager가 Yarn, Mesos, Kube 등 이용
· 병렬 처리에서 명시적으로 드러나는 매니저는 없지만, 내부적으로 해당 기능이 필요
· Airflow의 경우 Celery나 Rabbit MQ 등을 사용할 수 있습니다
- 스레드와 프로세스
└ 프로세스는 독립된 메모리 공간을 가지는 반면, 스레드는 같은 메모리 공간을 공유한다는 것이 가장 큰 차이
└ 파이썬은 멀티프로세싱이 일반적이며, 메모리가 독립돼 있어 각 프로세스가 자원을 공유하지 못 하는 것이 특징
- Joblib과 머신러닝
└ Joblib: 파이썬 제공하는 병렬처리 패키지 중 가장 쉽고 안정적으로 쓸 수 있는 패키지(in Sklearn)
└ k개의 멀티프로세싱을 할 때 k개 완료되어야 다음 순회로 넘어가 약간 비효율적인 것이 단점
· 20개 작업을 4개의 프로세서로 멀티프로세싱을 하면? 1~4 끝나야 5~8로 넘어감
└ 그 외 대안으로는 multiprocessing 패키지도 많이 사용
· 주피터 노트북에서 안되는 것이 단점
2. 대용량 데이터 처리하는 법 (Without Spark)
1) 샘플링(Sampling)
① Random sampling
- 가장 쉽고 단순한 방법이나 문제가 많아 비추천
- 표본이 우연히 편향될 수 있고, 매번 결과가 달라지는 것이 큰 단점
② Stratified sampling (층화추출)
- 특정 집단 편향을 줄이기 위해 집단별로 특정 비율을 뽑는 층화추출 방법
- 머신러닝 내 분류Classification 문제에서 자주 다룸
- 랜덤 샘플링과 유사하게, 매번 결과가 달라진다는 문제가 있음
③ Systematic sampling (체계적/계통 샘플링) ★
- ID와 같이, 패턴이 없거나 순서에 영향을 받지 않는 칼럼을 기준으로 추출하는 방법론
└ ID 등 기준 칼럼에 패턴이 있는지는 한 번 검사해보는 것을 추천
- 건너뛰는 단위는 소수(3, 5, 7, 11 등)를 사용하는 것이 특정 패턴을 피하기에 좋음
└ 예를 들어 짝수를 사용할 경우, 짝/홀에 대한 패턴이 잡힐 수 있기 때문
▶ 샘플링 시나리오 사례: 커머스의 캠페인 매출 효과 분석 (Spark 활용)
- 시나리오
└ 10만명의 유저가 상품 X와 관련된 캠페인 C에 참여
└ 상품 X는 다른 프로모션 P에서도 활용이 되었고, 오가닉 판매도 있음
└ 상품 X의 판매에 대해 캠페인 C가 얼마나 기여했는지 정량적인 측정이 필요한 상황
- 분석 계획
(1) 캠페인에 참여한 유저 목록을 테이블로 추출
(2) 각 유저에 대해, 최근(보통 1주일) 해당 상품에 어떤 경로로 얼마나 노출되었는지 집계
(3) 매출액을 각 비율에 맞추어 분배합니다
└ 특정 유저의 매출이 10만원이면, 기여도는 프로모션/캠페인 각 2.5만원(25%), 자연 노출 5만원(50%)
└ 최근 노출에 가중치를 주거나, 세션 내 구매면 바로 직전 노출에 기여도를 할당하는 등 variation 가능
(4) 각 유저에 대한 복잡한 연산이 들어가므로, 일부 유저에 한 해 먼저 검사하며 로을 개발
(5) 전체 유저에 대해서는 Spark나 분할처리 사용
2) 분할 처리
- 컴퓨터 메모리에 전체 데이터의 10%가 올라가는 것이 한계라면, 10%씩 나누어 10번 작업하는 것
- 샘플링과 궁합이 좋은 방법론
- 데이터가 완벽하게 분할되는 경우에 한 해 가능(소셜 네트워크와 같은 그래프는 독립성이 떨어져 분할이 어려움)
① 데이터 분할 설계
- 단일 기기에서 최대 처리 가능한 유저의 수에 따라 소수(ex. 13, 17..) 개로 데이터 분할 partition
- 13개 파티션을 user_id 기준으로 정의한다면?
└ i번째 파티션은 user_id를 13으로 나눈 나머지가 i인 유저의 집단
└ ex) 14 % 13 = 1이므로, user_id가 14인 user는 1번 파티션에 할당
└ 분할 대상 칼럼이 숫자가 아니어도, 해싱을 통해 수치형으로 변환 가능
② 집계 데이터 저장
- 각 파티션의 데이터를 불러와, 필요한 연산을 수행한 뒤 최종 결과만을 집계하여 저장
└ 모든 분할된 데이터를 하나의 컴퓨터로 올릴 순 없어도, 집계 결과는 한 컴퓨터에서 확인 가능하기 때문
└ 각 유저와 상품에 대한 모든 이벤트를 포함한 전체 데이터보다 집계된 데이터가 훨씬 작아서 가능!
- 집계는 결과만 딱 남기는 것보다는, 중간 계산 과정이 있어야 추후 검사가 가능하므로 유저 단위로 남기는 게 좋음
#결과값만 남기는 것은 지양
{
"campaign_c":568244534.772,
"promotion_p":420385423.312,
"organic":15532259.246
}
# 유저별 데이터 등 중간 계산 과정 확인 가능하도록 저장
[
1:{
"campaign_c":200
"promotion_p":500
"organic":0
},
3:{
"campaign_c":1000
"promotion_p":1000
"organic":3000
},
...
③ Progress 관리하기: Supervisord
* 난이도가 높고 Window에서 활용하기 어려우니 개념만 참고
- Supervisord란?
└ 다양한 사유로 죽는 프로세스를 반복적으로 부활시켜 컴퓨터게 계속 돌게 하는 기능
└ 특정 배시 스크립트(ex. python3 main.py)를 실행하고, 해당 스크립트가 죽을 때 마다 다시 실행
- [참조]
- 설치 및 실행 시 참조
[설치]
- Linux: apt-get install supervisor
- Centos: yum install supervisord
- 모두 안될 시 pip 사용: pip install supervisord
└ config 셋업이 안돼서 비추
- unable to locate package 해결책 [참조]
- unix:///var/run/supervisor.sock no such file 해결
└ sudo service supervisor stop
└ sudo service supervisor start
[Config]
- vi /etc/supervisord/conf.d 이 폴더 안에 실행할 프로세스를 설정
└ 기본 스크립트: 파일이 없으면 만들어서 붙여넣기 (pip이나 yum으로 설치할 때 이럴 수 있음)
[unix_http_server]
file=/tmp/supervisor.sock ; the path to the socket file
[supervisord]
logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
[program:mysh]
command=sh my.sh ; the program (relative uses PATH, can take args)
process_name=%(program_name)s ; process_name expr (default %(program_name)s)
numprocs=1 ; number of processes copies to start (def 1)
directory=/tmp ; directory to cwd to before exec (def no cwd)
;umask=022 ; umask for process (default None)
;priority=999 ; the relative start priority (default 999)
;autostart=true ; start at supervisord start (default: true)
startsecs=1 ; # of secs prog must stay up to be running (def. 1)
startretries=3 ; max # of serial start failures when starting (default 3)
autorestart=true ; when to restart if exited after running (def: unexpected)
;exitcodes=0,2 ; 'expected' exit codes used with autorestart (default 0,2)
;stopsignal=QUIT ; signal used to kill process (default TERM)
;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false ; send stop signal to the UNIX process group (default false)
;killasgroup=false ; SIGKILL the UNIX process group (def false)
;user=chrism ; setuid to this UNIX account to run the program
;redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/tmp/mysh.log ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
stderr_logfile=/tmp/mysh.err ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
environment=ACTOR="kildong" ; process environment additions (def no adds)
- 설정 상세
└ 위의 conf폴더 안에 {process_name}.conf 형식으로 컨피그 파일 생성
└ 아래는 토큰 카운트 프로세스 예시
[program:token_count]
command = python /data/private/repos/llm-stork/src/token_count.py # 실행 스크립트
directory = /data/private/repos/llm-stork/src # 실행 디렉토리
user = root # 실행할 유저
stdout_logfile = /data/private/logs/token_count_larget.py # 로그 저장 위치
redirect_stderr = true # 에러 출력 여부
[program:program2]
...
[Supervisord 실행]
- 실행: sudo supervisord
- 프로세스 실행
└ 전부 실행: supervisorctl start all
└ 하나만 실행: supervisorctl start {program_name}
- supervisorctl로 들어가서 status나 start해도 됨
└ supervisorctl status라는 명령어는
└ supervisorctl로 들어가서 status를 치는 것과 같음
- 관련 명령어
└ status: 현상태 보기
└ start: 프로세스 시작
└ reload: 컨피그 다시 읽으며 재시작 → 설정한 뒤 이걸 해줘야함
└ stop: 프로세스 중단
└ restar`: 프로세스 재시작
- 잘 실행되는 예시
- 서버가 죽어서 꼬일 시
└ 인지 못한 사이에 죽었을 때, 포트가 물려있을 수 있다
└ 재구동하면 [Errno 98] Address already in use만 반복될 시 (이것은 fastapi의 예시)
└ netstat -tnlp로 사용 중인 포트 확인 후
└ 해당 포트에 물려있는 프로세스를 보고 (PID 확인)
└ kill -0 {PID}
3) Dask
① Dask란?
- 파이썬 한정 Spark 다음으로 유명한 분산 처리 도구 (파이썬 외에는 도구가 다양함)
- NumPy, Pandas 및 sklearn과 같은 Python 라이브러리를 쉽게 확장할 수 있어, 코드가 쉽고 학습 난이도가 낮음
└ 반대로 Spark를 쓸 때는 코드를 많이 바꿔야 함
- Spark보다는 비교적 가볍게 사용 가능하나 구조 자체는 유사함
🖥️ Pandas vs. Dask vs. Spark
- 흔히 중간 규모(1~100GB) 데이터를 Dask로 하라고 권장
- 단, 3개를 모두 공부해야 하므로 웬만하면 Pandas, 감당 안되면 Spark 사용도 괜찮은 방법
② Dask의 유용성: 싱글 머신으로 큰 파일을 읽을 때
- 수십 기가의 큰 파일을 꼭 읽어야 할 때, dask로 파일 쪼개는 게 유용
└ 보통 parquet는 2기가 짜리로 파일을 청킹(짤라내는 것)하나, 몇십 기가 수준으로 용량이 큰 경우엔 dask 사용 고려
└ dask로 파일을 쪼갠 뒤, pandas로 세부적인 전처리 및 분석
- 아래 코드와 같이 사용하면 메모리를 초과하는 파일도 처리 가능
from dask import dataframe as dd
from tqdm import tqdm
path = ## 파일 경로
df = dd.read_parquet(path)
df = df.repartition(npartitions=20) # 20개로 분할
for i in tqdm(range(df.npartitions)):
df.partitions[i].to_parquet(f"{i}.parquet")
③ Ray, modin, RAPIDs…
- 많은 분산처리 도구가 있고, 각기 장점이 있다고 하지만, Spark와 Pandas로 충분
- 보통 일부 기능이 구현돼 있지 않거나, 굳이 따로 공부하는 게 번거롭거나, 자잘한 오류가 많음
4) 자동화
- 스파크는 보통 주기적인 데이터 처리에 많이 쓰이기 때문에 스케쥴링 및 자동화를 함께 다루는 경우가 다수
① 자동화가 필요한 이유는?
└ Batch job:주기적인 대규모 데이터에 대한 처리
└ Streaming job: 실시간으로 발생하는 데이터에 대한 처리
② 스케쥴링과 Cron
- Cron은 가장 쉽고 단순한 스케쥴링 도구로 프트웨어 이름인 동시에, 스케쥴링 표현식의 이름이기도 함
* 윈도우에서는 사용이 다소 다르고, 복잡
- 쉽게 세팅할 수 있으나 단점이 많아 규모 있는 회사에서는 Airflow나 Jenkins를 많이 사용
└ cron job이 실패할 경우, 따로 알람이 오지는 않으며, 알림 세팅해도 프레세스 죽으면 끝
└ cron이 도는 서버 자체가 죽으면 확인이 어렵고, 로그 제외 히스토리 파악이 힘듦
└ GUI를 따로 제공하지 않아 명령어 기반으로 해야 하며, 로드 밸런싱을 포함한 자원 관리를 지원하지 않음
③ 모니터링과 히스토리 관리
- 주기적인 Batch job을 설정 후, 의도대로 수행되지 않았을 때 알림이 오도록 설정하는 것이 모니터링
└ 보통 슬랙 등 메신저로 수신하거나 자체 모니터링 도구 지원 툴이 있지만 수동 설정도 어렵지 않음
└ 다음은 일반적인 batch job 함수 예시
...
if __name__ == "__main__":
try:
main()
except Exception as e: #에러 메시지 보내기
post_to_slack(str(e), True)
- 완료된 job들이 언제 어떻게 수행됐는지 파악하는 것 또한 중요
└ 히스토리 관리를 위해 코드 단에서 어떻게 작업이 수행되었는지 기록logging하는 것이 중요
└ 다음은 파이썬의 logging 사용 예시
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(message)s',
level=logging.DEBUG,
datefmt='%m/%d/%Y %I:%M:%S %p',
)
logging.debug("Sample debug message")
>> 04/23/2022 04:33:13 PM DEBUG:Sample debug message
'TIL' 카테고리의 다른 글
[240319] 스파크(spark): 배경 및 기본 구조, 기능 (1) | 2024.03.19 |
---|---|
[240319] 파이썬: 코드카타 44 & SQL: 코드카타 156~157 (0) | 2024.03.19 |
[240318] 파이썬: 코드카타 43 & SQL: 코드카타 149~155 (0) | 2024.03.18 |
[240315] 파이썬: 코드카타 42 & SQL: 코드카타 146~148 (0) | 2024.03.15 |
[240315] 스파크(spark): 컴퓨터와 데이터, 메모리, 클라우드 (1) | 2024.03.15 |