# PySpark
df = spark.read.options(header=True, inferSchema=True).csv('sample.csv') # load csv
df.show() # show df
df.show(10)
df.columns # show columns
df.dtypes
df.toDF('a', 'b', 'c') # column name 변경
df.withColumnRenamed('old', 'new')
df.drop('mpg')
df[df.mpg > 20] # filtering
df[(df.mpg > 20) & (df.cyl == 6)] # operation
import pyspark.sql.functions as F
df.withColumn('logdisp', F.log(df.log(df.disp)) # transformation
# numpy 쓸 수 있지만 되도록 built-in function 사용할 것
df.sample(False, 0.1).toPandas().hist() # histogram
두 가지 문법이 완전히 똑같은 것도 있고 약간 다른 것도 있으나, pandas에 익숙한 파이썬 유저라면 쉽게 사용이 가능할 것 같다.
PySpark는 히스토그램 기능을 제공하지 않지만, 판다스로 변환 후 사용이 가능하다. 그말인 즉슨 pandas에서 할 수 있는 것을 모두 PySpark로 변환해서 사용이 가능하다는 뜻. 하지만 데이터의 크기가 클 경우 메모리가 run out 될 가능성이 있다.
PySpark 사용시 주의할 점
반드시 built-in function을 이용할 것
파이썬과 드라이버 클러스터에 있는 패키지를 같은 버전으로 사용할 것
built-in UI 확인할 것 http://localhost:4040
SSH port forwarding
Spark MLlib를 적극 활용할 것 (사이킷런과 거의 비슷)
모든 로우에 대해 반복문을 실행하지 마라(pandas는 할 수는 있음)
하드코딩 하지 말고 spark-submit을 이용해라
필터링하고 판다스로 변환할 경우 필터링 후 변환해야함(df.limit(5).toPandas())
data in JVM
PySpark를 사용할 때 스파크의 executors는 파이썬을 전혀 사용하지 않는다. 데이터는 JVM안에 있기 때문에 그런데, 일단 스파크는 단일 스레드에서 실행되는 것이 아니다. 따라서 스파크가 단일 스레드에서 실행될 경우 Pandas가 훨씬 더 좋은 성능을 내는 상황이 있다. 그러므로 스파크는 여러 컴퓨터 및 여러 작업을 실행하는 각 컴퓨터에서 사용할 때 빛이난다 ✨
SQL
Pandas는 native SQL syntax을 지원하지 않지만 PySpark는 지원한다. 때문에 SQL 데이터를 데이터 프레임으로 바꿀 때는 PySpark 가 더 좋다
df.createOrReplaceTempView('foo')
df2 = spark.sql('select * from foo')
일단 Spark는 'Lazy' 하다는 것을 기억해야 한다. Lazy Evaluation은 output을 내야할 때 모든 operation 이 일어나고 그 전에는 아무일도 하지 않는 다는 것을 의미한다. 위 코드의 1번째 줄인 상태는 아직 Reference만 만든 것 뿐 아무 연산이 일어나지 않은 것이다.
Pands PySpark Koalas
그리고 판다스랑 스파크를 선택해야 하는게 아니라 이 둘을 조화롭게 사용하는 것이 좋다고 한다. 거기다 databricks의 Koalas 프레임워크까지 곁들이면 더 좋다고 한다. 참고링크 많은 양의 데이터일 경우 pandas는 비효율적일 수 있는데 이를 위해 Koalas라는 라이브러리가 나왔다고 한다.
영화 평점 데이터를 로드하는 데서 이들의 차이를 살펴보자
# PySpark
from pyspark.sql.functions import col
movies_with_ratings = movies_df.join(ratings_df, movies_df.movieId == ratings_df.movieId)
(movies_with_ratings
.groupBy("title")
.count()
.orderBy(col("count").desc())
.show(10, False))
# Spark SQL
spark.sql("""
SELECT title, COUNT(*) as CNT
FROM movies
LEFT JOIN ratings ON movies.movieId = ratings.movieID
GROUP BY title
ORDER BY CNT DESC
LIMIT 10
""").show(10,False)