πŸ’» μ‹€μŠ΅: μ•„νŒŒμΉ˜ 슀파크#

νŒŒν‹°μ…˜, transformation, action λ“± 확인#

from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
sc

SparkContext

Spark UI

Version
v3.4.1
Master
local[*]
AppName
pyspark-shell
data = [1,2,3,4,5]
# νŒŒν‹°μ…˜μ˜ 개수λ₯Ό μ„€μ •κ°€λŠ₯. 일반적으둜 μŠ€νŒŒν¬λŠ” μžλ™μœΌλ‘œ 개수 μ„€μ •
distData = sc.parallelize(data, 10)     # 10개의 νŒŒν‹°μ…˜μœΌλ‘œ μˆ˜ν–‰
distData
ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287
res = distData.reduce(lambda a, b : a + b)
res
15

μ™ΈλΆ€ 데이터셋 κ°€μ Έμ˜€κΈ°#

  • 둜컬 파일, HDFS, S3 λ“± ν•˜λ‘‘μ΄ μ§€μ›ν•˜λŠ” μŠ€ν† λ¦¬μ§€λ‘œλΆ€ν„° λΆ„μ‚° 데이터셋을 뢈러올 수 μžˆμŠ΅λ‹ˆλ‹€.

# linesλŠ” ν˜„μž¬ λ©”λͺ¨λ¦¬μ— λ‘œλ“œλ˜μ§€ μ•Šκ³  ν•΄λ‹Ή νŒŒμΌμ„ κ°€λ₯΄ν‚€λŠ” ν¬μΈν„°μž„
lines = sc.textFile('./printed.txt',)

# mapμ΄λΌλŠ” λ³€ν™˜μ„ μ·¨ν•œ ν›„ κ²°κ³Όκ°’(μ—°μ‚°λ˜μ§€ μ•ŠλŠ” μƒνƒœ)
lineLengths = lines.map(lambda l: len(l)) # PythonRDD[16] at RDD at PythonRDD.scala:53

# reduceλΌλŠ” μ•‘μ…˜μ„ μ·¨ν•¨μœΌλ‘œμ¨ 병렬 처리λ₯Ό ν•˜λ©΄μ„œ μž‘μ—… 연산을 μˆ˜ν–‰
# κ²°κ³Όκ°’λ§Œ driver programμ—κ²Œ λ°˜ν™˜
totalLength = lineLengths.reduce(lambda a, b: a+b)

Deep learning#

(image classification) - 일뢀 μ˜ˆμ‹œμž…λ‹ˆλ‹€.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df)    # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability",  "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))