๐Ÿ’ป ์‹ค์Šต: ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ#

ํŒŒํ‹ฐ์…˜, transformation, action ๋“ฑ ํ™•์ธ#

from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
sc

SparkContext

Spark UI

Version
v3.4.1
Master
local[*]
AppName
pyspark-shell
data = [1,2,3,4,5]
# ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜๋ฅผ ์„ค์ •๊ฐ€๋Šฅ. ์ผ๋ฐ˜์ ์œผ๋กœ ์ŠคํŒŒํฌ๋Š” ์ž๋™์œผ๋กœ ๊ฐœ์ˆ˜ ์„ค์ •
distData = sc.parallelize(data, 10)     # 10๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ˆ˜ํ–‰
distData
ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287
res = distData.reduce(lambda a, b : a + b)
res
15

์™ธ๋ถ€ ๋ฐ์ดํ„ฐ์…‹ ๊ฐ€์ ธ์˜ค๊ธฐ#

  • ๋กœ์ปฌ ํŒŒ์ผ, HDFS, S3 ๋“ฑ ํ•˜๋‘ก์ด ์ง€์›ํ•˜๋Š” ์Šคํ† ๋ฆฌ์ง€๋กœ๋ถ€ํ„ฐ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ์…‹์„ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

# lines๋Š” ํ˜„์žฌ ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œ๋˜์ง€ ์•Š๊ณ  ํ•ด๋‹น ํŒŒ์ผ์„ ๊ฐ€๋ฅดํ‚ค๋Š” ํฌ์ธํ„ฐ์ž„
lines = sc.textFile('./printed.txt',)

# map์ด๋ผ๋Š” ๋ณ€ํ™˜์„ ์ทจํ•œ ํ›„ ๊ฒฐ๊ณผ๊ฐ’(์—ฐ์‚ฐ๋˜์ง€ ์•Š๋Š” ์ƒํƒœ)
lineLengths = lines.map(lambda l: len(l)) # PythonRDD[16] at RDD at PythonRDD.scala:53

# reduce๋ผ๋Š” ์•ก์…˜์„ ์ทจํ•จ์œผ๋กœ์จ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋ฉด์„œ ์ž‘์—… ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰
# ๊ฒฐ๊ณผ๊ฐ’๋งŒ driver program์—๊ฒŒ ๋ฐ˜ํ™˜
totalLength = lineLengths.reduce(lambda a, b: a+b)

Deep learning#

(image classification) - ์ผ๋ถ€ ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df)    # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability",  "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))