π» μ€μ΅: μνμΉ μ€νν¬#
νν°μ , transformation, action λ± νμΈ#
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
sc
data = [1,2,3,4,5]
# νν°μ
μ κ°μλ₯Ό μ€μ κ°λ₯. μΌλ°μ μΌλ‘ μ€νν¬λ μλμΌλ‘ κ°μ μ€μ
distData = sc.parallelize(data, 10) # 10κ°μ νν°μ
μΌλ‘ μν
distData
ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287
res = distData.reduce(lambda a, b : a + b)
res
15
μΈλΆ λ°μ΄ν°μ κ°μ Έμ€κΈ°#
λ‘컬 νμΌ, HDFS, S3 λ± νλ‘μ΄ μ§μνλ μ€ν 리μ§λ‘λΆν° λΆμ° λ°μ΄ν°μ μ λΆλ¬μ¬ μ μμ΅λλ€.
# linesλ νμ¬ λ©λͺ¨λ¦¬μ λ‘λλμ§ μκ³ ν΄λΉ νμΌμ κ°λ₯΄ν€λ ν¬μΈν°μ
lines = sc.textFile('./printed.txt',)
# mapμ΄λΌλ λ³νμ μ·¨ν ν κ²°κ³Όκ°(μ°μ°λμ§ μλ μν)
lineLengths = lines.map(lambda l: len(l)) # PythonRDD[16] at RDD at PythonRDD.scala:53
# reduceλΌλ μ‘μ
μ μ·¨ν¨μΌλ‘μ¨ λ³λ ¬ μ²λ¦¬λ₯Ό νλ©΄μ μμ
μ°μ°μ μν
# κ²°κ³Όκ°λ§ driver programμκ² λ°ν
totalLength = lineLengths.reduce(lambda a, b: a+b)
Deep learning#
(image classification) - μΌλΆ μμμ λλ€.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])
model = p.fit(train_images_df) # train_images_df is a dataset of images and labels
# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability", "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))