๐ป MapReduce ์ค์ต#
1) Docker ์ค์น#
MacOS ๊ธฐ์ค์ผ๋ก ์์ฑํ์์ต๋๋ค.
brew install cask
brew install docker --cask
2) HDFS/Spark Workbench Quick Start#
Container ๋์ฐ๊ธฐ
Hadoop, Spark๊น์ง ๋ชจ๋ ๋์ฐ๊ณ ์ถ๋ค๋ฉด
docker-compose up -d
Hadoop๋ง ๋์ฐ๊ณ ์ถ๋ค๋ฉด
docker-compose -f docker-compose-hive.yml up -d namenode docker-compose -f docker-compose-hive.yml up -d datanode
Container ํ์ธ
docker ps
Hadoop check
docker exec -it namenode /bin/bash
hadoop fs -ls /
(์ฒซ ์ค์น ํ์๋ ์๋ฌด ๊ฒ๋ ์๋๊ฒ ์ ์์ ๋๋ค!)
3) MapReduce ์ค์ต#
MapReduce ์ค์ต์ผ๋ก ๋ง์ด ํ์ฉ๋๋ WordCount๋ฅผ ์งํํด๋ณผ ์์ ์ ๋๋ค. WordCount๋ ๋ง ๊ทธ๋๋ก ํ ์คํธ์์ ๋ฑ์ฅํ๋ ๋จ์ด์ ๊ฐ์๋ฅผ ์ธ๋ ํ๋ก๊ทธ๋จ์ ๋๋ค. ๊ทธ๋ ๋ค๋ฉด ์ Hadoop ์ค์ต์ผ๋ก WordCount๋ฅผ ์งํํ ๊น์? Hadoop์ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ถ์ฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์์คํ ์ ๋๋ค. ๊ทธ๋์ ๋ถ์ฐ ์ ์ฅ๋ txt ํ์ผ์ ๊ฐ๊ฐ์ node์์ WordCount๋ฅผ ์งํํ ํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฌ์ฃผ๊ฒ ๋ฉ๋๋ค. ๋ฌผ๋ก ์ด๋ฒ ์ค์ต์์ datanode๋ฅผ ํ๋๋ง ๋์ ๊ธฐ ๋๋ฌธ์ ์ง์ ํ ์๋ฏธ์ ๋ถ์ฐ ์ฒ๋ฆฌ๋ฅผ ์ํํ ์ ์์ผ๋ Hadoop์ ๊ตฌ์กฐ์ MapReduce์ ์๋ ๋ฐฉ์์ ์ดํดํ๋๋ฐ๋ ๋ฌด๋ฆฌ๊ฐ ์์ ๊ฒ์ ๋๋ค.
๋จผ์ ์์ ๊ตฌ์ฑํ hadoop ํด๋ฌ์คํฐ์ datanode์ namenode๋ local machine์ volumes๊ณผ ์ฐ๊ฒฐ์ ํด๋์๋๋ฐ์, yml ํ์ผ์์ volume ๊ตฌ์ฑ์ด ./data/datanode:/hadoop/dfs/data
์ด๋ ๊ฒ ๋์ด์๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค. ์ด๋ local์์ ./data/datanode
์ ์์น์ ์ปจํ
์ด๋ ์์์ /hadoop/dfs/data
์ ๊ฒฝ๋ก๋ฅผ ์ฐ๋์ํจ ๊ฒ์
๋๋ค. ๋ฐ๋ผ์ ์ ํฌ๋ ํ์ฌ ๋๋ ํ ๋ฆฌ์์ cd ./data/datanode
๋ฅผ ํตํด ์์น๋ฅผ ์ด๋ํด์ค๋๋ค.
์ดํ ์ ํฌ๊ฐ ์ค์ต์ ์ฌ์ฉํ ํ์ผ์ ํ๋ ๋ง๋ค์ด ์ค๋๋ค!
words.txt
DE4E ์คํฐ๋ ๊ฐ์ง์ฐ๊ตฌ์ ๋ฐ์ ์คํฐ๋ ๋ชจ์ ๊ฐ์ง์ฐ๊ตฌ์ ๋จธ์ ๋ฌ๋ ๋ฐ์ดํฐ์ฌ์ด์ธ์ค ์ค์ฌ ๋ชจ์
์ด์ MapReduce ์ฝ๋๋ฅผ ์์ฑํ ์ฐจ๋ก์ ๋๋ค.
https://docs.google.com/presentation/d/1nIMJN3m9n9EEDtTSa_rRYwyAKlNUtQg6KSHTOfAVD_A/edit#slide=id.p
WordCount.java
(Apache Hadoop ๊ณต์ Tutorial)import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { // Mapper ํด๋์ค ์ ์ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); // Map ๋ฉ์๋ ๊ตฌํ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); // ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ํ ํฐํ while (itr.hasMoreTokens()) { // ํ ํฐ์ด ์กด์ฌํ๋ ๋์ ๋ฐ๋ณต word.set(itr.nextToken()); // ํ ํฐ์ Text ๊ฐ์ฒด์ ์ ์ฅ context.write(word, one); // ์ถ๋ ฅ ํค/๊ฐ ์ ์์ฑ ํ ์ถ๋ ฅ } } } // Reducer ํด๋์ค ์ ์ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); // Reduce ๋ฉ์๋ ๊ตฌํ public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { // ๋์ผํ ํค๋ฅผ ๊ฐ์ง ๊ฐ๋ค์ ํฉ์ฐ sum += val.get(); } result.set(sum); // ํฉ์ฐ ๊ฒฐ๊ณผ๋ฅผ IntWritable ๊ฐ์ฒด์ ์ ์ฅ context.write(key, result); // ์ถ๋ ฅ ํค/๊ฐ ์ ์์ฑ ํ ์ถ๋ ฅ } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // Hadoop ์ค์ ์ ๋ณด ๊ฐ์ฒด ์์ฑ Job job = Job.getInstance(conf, "word count"); // Job ๊ฐ์ฒด ์์ฑ job.setJarByClass(WordCount.class); // Job ํด๋์ค ์ค์ job.setMapperClass(TokenizerMapper.class); // Mapper ํด๋์ค ์ค์ job.setCombinerClass(IntSumReducer.class); // Combiner ํด๋์ค ์ค์ job.setReducerClass(IntSumReducer.class); // Reducer ํด๋์ค ์ค์ job.setOutputKeyClass(Text.class); // ์ถ๋ ฅ ํค ํด๋์ค ์ค์ job.setOutputValueClass(IntWritable.class); // ์ถ๋ ฅ ๊ฐ ํด๋์ค ์ค์ FileInputFormat.addInputPath(job, new Path(args[0])); // ์ ๋ ฅ ํ์ผ ๊ฒฝ๋ก ์ค์ FileOutputFormat.setOutputPath(job, new Path(args[1])); // ์ถ๋ ฅ ๋๋ ํ ๋ฆฌ ๊ฒฝ๋ก ์ค์ System.exit(job.waitForCompletion(true) ? 0 : 1); // Job ์คํ ํ ์ข ๋ฃ } }
๋ ์์ธํ
private final static IntWritable one = new IntWritable(1);
์ด ์ฝ๋๋ IntWritable ํ์ ์ ๋ณ์ one์ ์์ฑํ๊ณ , ์ด ๋ณ์์ ์ด๊ธฐ๊ฐ์ 1๋ก ์ค์ ํฉ๋๋ค. IntWritable์ Hadoop์์ ์ฌ์ฉ๋๋ ์ ์ํ ํ์ ์ด๋ฉฐ, MapReduce ์์ ์์ ์ถ๋ ฅํ ๊ฐ์ ์ ์ฅํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค. ์ด ์ฝ๋๋ WordCount ์์ ์์ Mapper ํด๋์ค์์ ์ถ๋ ฅํ ๊ฐ์ ์ด๊ธฐ๊ฐ์ผ๋ก ์ฌ์ฉ๋ฉ๋๋ค. Mapper ํด๋์ค๋ ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ํ ํฐํํ๊ณ , ๊ฐ ๋จ์ด๋ณ๋ก (๋จ์ด, 1)์ ํํ๋ก ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค. ์ด๋ 1์ด๋ผ๋ ๊ฐ์ IntWritable ํ์ ์ ๋ณ์ one์ ์ ์ฅํ๊ณ , ์ด ๋ณ์๋ฅผ ์ถ๋ ฅ ๊ฐ์ผ๋ก ์ฌ์ฉํฉ๋๋ค. ์ฆ, Mapper์์ ์์ฑ๋๋ ์ถ๋ ฅ ๊ฐ์ (๋จ์ด, one)์ ํํ๋ก ์์ฑ๋ฉ๋๋ค.
// Map ๋ฉ์๋ ๊ตฌํ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); // ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ํ ํฐํ while (itr.hasMoreTokens()) { // ํ ํฐ์ด ์กด์ฌํ๋ ๋์ ๋ฐ๋ณต word.set(itr.nextToken()); // ํ ํฐ์ Text ๊ฐ์ฒด์ ์ ์ฅ context.write(word, one); // ์ถ๋ ฅ ํค/๊ฐ ์ ์์ฑ ํ ์ถ๋ ฅ } }
์ ์ฝ๋๋ Mapper ํด๋์ค์ map ๋ฉ์๋๋ฅผ ๊ตฌํํ ๊ฒ์ ๋๋ค.
์ด ๋ฉ์๋๋ ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ํ ํฐํํ์ฌ ํ ํฐ์ด ์กด์ฌํ๋ ๋์ ํ ํฐ์ ํ๋์ฉ ๊ฐ์ ธ์์ ํค-๊ฐ ์์ ์์ฑํ๊ณ ์ถ๋ ฅํฉ๋๋ค.
์ฌ๊ธฐ์๋ ์ ๋ ฅ ๊ฐ์ผ๋ก ๋ค์ด์จ Text ๊ฐ์ฒด๋ฅผ StringTokenizer ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ ๊ณต๋ฐฑ์ ๊ธฐ์ค์ผ๋ก ํ ํฐํํ๊ณ , while๋ฌธ์ ์ด์ฉํ์ฌ ํ ํฐ์ด ์กด์ฌํ๋ ๋์ ๋ฐ๋ณต์ ์ผ๋ก ์ฒ๋ฆฌํฉ๋๋ค.
๊ทธ๋ฆฌ๊ณ ๊ฐ๊ฐ์ ํ ํฐ์ word ๋ณ์์ ์ ์ฅํ ํ, context.write ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ถ๋ ฅ ํค-๊ฐ ์์ ์์ฑํฉ๋๋ค.
์ด๋, ์ถ๋ ฅ ๊ฐ์ผ๋ก๋ Mapper ํด๋์ค ๋ด๋ถ์ ๋ฏธ๋ฆฌ ์ ์ธ๋ one ๋ณ์๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด ๋ณ์๋ IntWritable ํด๋์ค ํ์ ์ ๊ฐ์ฒด๋ก, ๊ทธ ๊ฐ์ 1๋ก ์ด๊ธฐํ๋ฉ๋๋ค. ๋ฐ๋ผ์ ๋ชจ๋ ์ถ๋ ฅ ๊ฐ์ 1์ด ๋ฉ๋๋ค.
์ด์ datanode์ ์ ์ํด์ ์์ ๋ง๋ค์ด๋ ํ์ผ์ HDFS์ ์ฌ๋ฆฌ๊ณ ์์ฑํด๋ MapReduce ํ๋ก๊ทธ๋จ์ ์คํํด๋ณด๋ ค ํฉ๋๋ค. ๊ทธ๋ฌ๊ธฐ ์ํด์ ๋จผ์ datanode ์ปจํ ์ด๋ ์์ ์์ฑํด๋ ํ์ผ์ด ์๋ ์์น๋ก ์ด๋์ ํด์ผํฉ๋๋ค.
docker exec -it docker-hadoop-spark-workbench_datanode_1 /bin/bash
cd /hadoop/dfs/data
ls
(ํ์๋ ์ด๋ฏธ ํ๋ฒ ์คํํ๊ธฐ ๋๋ฌธ์ class ํ์ผ์ด๋ jar ํ์ผ ๋ฑ ์ฌ๋ฌ ์ก๋คํ ํ์ผ์ด ๋ง์ด ์์ต๋๋ค..!)
์ด๋ค ๋๋ ํ ๋ฆฌ์์๋ hadoop ๋ช ๋ น์ด๋ฅผ ์คํํ๊ธฐ ์ํด์ java์ hadoop ๊ด๋ จ ํ๊ฒฝ ๋ณ์๋ฅผ ์ค์ ํด์ฃผ์ด์ผ ํฉ๋๋ค! ๊ฐ๋จํ๊ฒ ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ฉด ๋ฉ๋๋ค!
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
์ด์ class ํ์ผ์ ๋ง๋ค๊ณ ์ด๋ฅผ jar ํ์ผ๋ก ํจํค์ง ํ๋ ์์ ๋ง ํ๋ฉด ์ค๋น๋ ๋๋ฌ์ต๋๋ค!
hadoop com.sun.tools.javac.Main WordCount.java
jar cf wc.jar WordCount*.class
โhadoop com.sun.tools.javac.Mainโ์ Hadoop์์ ์ ๊ณตํ๋ ์๋ฐ ์ปดํ์ผ๋ฌ๋ฅผ ์ฌ์ฉํ์ฌ ์์ค ์ฝ๋๋ฅผ ์ปดํ์ผํ๋ ๋ช ๋ น์ด์ ๋๋ค.
โWordCount.javaโ๋ ์ปดํ์ผํ๋ ค๋ ์๋ฐ ํ์ผ์ ์ด๋ฆ์ ๋๋ค. ์ด ๋ช ๋ น์ด๋ฅผ ์คํํ๋ฉด โWordCount.classโ๋ผ๋ ๋ฐ์ด๋๋ฆฌ ํ์ผ์ด ์์ฑ๋ฉ๋๋ค. ์ด ํ์ผ์ MapReduce ์์ ์ ์คํํ๋ ๋ฐ ํ์ํฉ๋๋ค.
์! ์ด์ HDFS์ ์ฌ๋ฆด ์๊ฐ์ ๋๋ค.
hadoop fs -put ./words.txt /
์ดํ ์์ ๋ง๋ค์ด๋ jar ํ์ผ์ ์คํํ๋ฉด ๋ฉ๋๋ค.
hadoop jar wc.jar WordCount /words.txt /output
โ ์์ ์ด ์ํ๋๊ณ ์์ต๋๋ค!
output ํ์ผ์ ์ด์ด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ค ์ ์์ต๋๋ค!
hadoop fs -ls /output
hadoop fs -cat /output/part-r-00000