๐ป ์ค์ต: ์ํ์น ์คํํฌ#
ํํฐ์ , transformation, action ๋ฑ ํ์ธ#
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
sc
data = [1,2,3,4,5]
# ํํฐ์
์ ๊ฐ์๋ฅผ ์ค์ ๊ฐ๋ฅ. ์ผ๋ฐ์ ์ผ๋ก ์คํํฌ๋ ์๋์ผ๋ก ๊ฐ์ ์ค์
distData = sc.parallelize(data, 10) # 10๊ฐ์ ํํฐ์
์ผ๋ก ์ํ
distData
ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287
res = distData.reduce(lambda a, b : a + b)
res
15
์ธ๋ถ ๋ฐ์ดํฐ์ ๊ฐ์ ธ์ค๊ธฐ#
๋ก์ปฌ ํ์ผ, HDFS, S3 ๋ฑ ํ๋ก์ด ์ง์ํ๋ ์คํ ๋ฆฌ์ง๋ก๋ถํฐ ๋ถ์ฐ ๋ฐ์ดํฐ์ ์ ๋ถ๋ฌ์ฌ ์ ์์ต๋๋ค.
# lines๋ ํ์ฌ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋๋์ง ์๊ณ ํด๋น ํ์ผ์ ๊ฐ๋ฅดํค๋ ํฌ์ธํฐ์
lines = sc.textFile('./printed.txt',)
# map์ด๋ผ๋ ๋ณํ์ ์ทจํ ํ ๊ฒฐ๊ณผ๊ฐ(์ฐ์ฐ๋์ง ์๋ ์ํ)
lineLengths = lines.map(lambda l: len(l)) # PythonRDD[16] at RDD at PythonRDD.scala:53
# reduce๋ผ๋ ์ก์
์ ์ทจํจ์ผ๋ก์จ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ํ๋ฉด์ ์์
์ฐ์ฐ์ ์ํ
# ๊ฒฐ๊ณผ๊ฐ๋ง driver program์๊ฒ ๋ฐํ
totalLength = lineLengths.reduce(lambda a, b: a+b)
Deep learning#
(image classification) - ์ผ๋ถ ์์์ ๋๋ค.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])
model = p.fit(train_images_df) # train_images_df is a dataset of images and labels
# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability", "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))