๐Ÿ’ป MapReduce ์‹ค์Šต#

1) Docker ์„ค์น˜#

  • MacOS ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค.

brew install cask
brew install docker --cask

2) HDFS/Spark Workbench Quick Start#

  • Container ๋„์šฐ๊ธฐ

    • Hadoop, Spark๊นŒ์ง€ ๋ชจ๋‘ ๋„์šฐ๊ณ  ์‹ถ๋‹ค๋ฉด

      docker-compose up -d
      
    • Hadoop๋งŒ ๋„์šฐ๊ณ  ์‹ถ๋‹ค๋ฉด

      docker-compose -f docker-compose-hive.yml up -d namenode
      docker-compose -f docker-compose-hive.yml up -d datanode
      
  • Container ํ™•์ธ

docker ps

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-04 แ„‹แ…ฉแ„’แ…ฎ 4.06.26.png

  • Hadoop check

docker exec -it namenode /bin/bash
hadoop fs -ls /

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-04 แ„‹แ…ฉแ„’แ…ฎ 4.07.24.png

(์ฒซ ์„ค์น˜ ํ›„์—๋Š” ์•„๋ฌด ๊ฒƒ๋„ ์—†๋Š”๊ฒŒ ์ •์ƒ์ž…๋‹ˆ๋‹ค!)

3) MapReduce ์‹ค์Šต#

MapReduce ์‹ค์Šต์œผ๋กœ ๋งŽ์ด ํ™œ์šฉ๋˜๋Š” WordCount๋ฅผ ์ง„ํ–‰ํ•ด๋ณผ ์˜ˆ์ •์ž…๋‹ˆ๋‹ค. WordCount๋Š” ๋ง ๊ทธ๋Œ€๋กœ ํ…์ŠคํŠธ์—์„œ ๋“ฑ์žฅํ•˜๋Š” ๋‹จ์–ด์˜ ๊ฐœ์ˆ˜๋ฅผ ์„ธ๋Š” ํ”„๋กœ๊ทธ๋žจ์ž…๋‹ˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ์™œ Hadoop ์‹ค์Šต์œผ๋กœ WordCount๋ฅผ ์ง„ํ–‰ํ• ๊นŒ์š”? Hadoop์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ๋ถ„์‚ฐ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ์ž…๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ๋ถ„์‚ฐ ์ €์žฅ๋œ txt ํŒŒ์ผ์„ ๊ฐ๊ฐ์˜ node์—์„œ WordCount๋ฅผ ์ง„ํ–‰ํ•œ ํ›„ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์—ฌ์ฃผ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ๋ฌผ๋ก  ์ด๋ฒˆ ์‹ค์Šต์—์„  datanode๋ฅผ ํ•˜๋‚˜๋งŒ ๋„์› ๊ธฐ ๋•Œ๋ฌธ์— ์ง„์ •ํ•œ ์˜๋ฏธ์˜ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•  ์ˆœ ์—†์œผ๋‚˜ Hadoop์˜ ๊ตฌ์กฐ์™€ MapReduce์˜ ์ž‘๋™ ๋ฐฉ์‹์„ ์ดํ•ดํ•˜๋Š”๋ฐ๋Š” ๋ฌด๋ฆฌ๊ฐ€ ์—†์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-04 แ„‹แ…ฉแ„’แ…ฎ 4.16.35.png

๋จผ์ € ์•ž์„œ ๊ตฌ์„ฑํ•œ hadoop ํด๋Ÿฌ์Šคํ„ฐ์˜ datanode์™€ namenode๋Š” local machine์˜ volumes๊ณผ ์—ฐ๊ฒฐ์„ ํ•ด๋‘์—ˆ๋Š”๋ฐ์š”, yml ํŒŒ์ผ์—์„œ volume ๊ตฌ์„ฑ์ด ./data/datanode:/hadoop/dfs/data ์ด๋ ‡๊ฒŒ ๋˜์–ด์žˆ๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” local์—์„œ ./data/datanode์˜ ์œ„์น˜์™€ ์ปจํ…Œ์ด๋„ˆ ์•ˆ์—์„œ /hadoop/dfs/data์˜ ๊ฒฝ๋กœ๋ฅผ ์—ฐ๋™์‹œํ‚จ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์ €ํฌ๋Š” ํ˜„์žฌ ๋””๋ ‰ํ† ๋ฆฌ์—์„œ cd ./data/datanode๋ฅผ ํ†ตํ•ด ์œ„์น˜๋ฅผ ์ด๋™ํ•ด์ค๋‹ˆ๋‹ค.

์ดํ›„ ์ €ํฌ๊ฐ€ ์‹ค์Šต์— ์‚ฌ์šฉํ•  ํŒŒ์ผ์„ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด ์ค๋‹ˆ๋‹ค!

  • words.txt

    DE4E ์Šคํ„ฐ๋”” ๊ฐ€์งœ์—ฐ๊ตฌ์†Œ ๋ฐ์—” ์Šคํ„ฐ๋”” ๋ชจ์ž„
    ๊ฐ€์งœ์—ฐ๊ตฌ์†Œ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ฐ์ดํ„ฐ์‚ฌ์ด์–ธ์Šค ์ค‘์‹ฌ ๋ชจ์ž„
    

์ด์ œ MapReduce ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•  ์ฐจ๋ก€์ž…๋‹ˆ๋‹ค.

https://docs.google.com/presentation/d/1nIMJN3m9n9EEDtTSa_rRYwyAKlNUtQg6KSHTOfAVD_A/edit#slide=id.p

https://docs.google.com/presentation/d/1nIMJN3m9n9EEDtTSa_rRYwyAKlNUtQg6KSHTOfAVD_A/edit#slide=id.p

  • WordCount.java (Apache Hadoop ๊ณต์‹ Tutorial)

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
      // Mapper ํด๋ž˜์Šค ์ •์˜
      public static class TokenizerMapper
           extends Mapper<Object, Text, Text, IntWritable>{
    
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        // Map ๋ฉ”์„œ๋“œ ๊ตฌํ˜„
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());  // ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ํ† ํฐํ™”
          while (itr.hasMoreTokens()) {  // ํ† ํฐ์ด ์กด์žฌํ•˜๋Š” ๋™์•ˆ ๋ฐ˜๋ณต
            word.set(itr.nextToken());  // ํ† ํฐ์„ Text ๊ฐ์ฒด์— ์ €์žฅ
            context.write(word, one);  // ์ถœ๋ ฅ ํ‚ค/๊ฐ’ ์Œ ์ƒ์„ฑ ํ›„ ์ถœ๋ ฅ
          }
        }
      }
    
      // Reducer ํด๋ž˜์Šค ์ •์˜
      public static class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
    
        // Reduce ๋ฉ”์„œ๋“œ ๊ตฌํ˜„
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {  // ๋™์ผํ•œ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๊ฐ’๋“ค์„ ํ•ฉ์‚ฐ
            sum += val.get();
          }
          result.set(sum);  // ํ•ฉ์‚ฐ ๊ฒฐ๊ณผ๋ฅผ IntWritable ๊ฐ์ฒด์— ์ €์žฅ
          context.write(key, result);  // ์ถœ๋ ฅ ํ‚ค/๊ฐ’ ์Œ ์ƒ์„ฑ ํ›„ ์ถœ๋ ฅ
        }
      }
    
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();  // Hadoop ์„ค์ • ์ •๋ณด ๊ฐ์ฒด ์ƒ์„ฑ
        
        Job job = Job.getInstance(conf, "word count");  // Job ๊ฐ์ฒด ์ƒ์„ฑ
        job.setJarByClass(WordCount.class);  // Job ํด๋ž˜์Šค ์„ค์ •
        job.setMapperClass(TokenizerMapper.class);  // Mapper ํด๋ž˜์Šค ์„ค์ •
        job.setCombinerClass(IntSumReducer.class);  // Combiner ํด๋ž˜์Šค ์„ค์ •
        job.setReducerClass(IntSumReducer.class);  // Reducer ํด๋ž˜์Šค ์„ค์ •
        job.setOutputKeyClass(Text.class);  // ์ถœ๋ ฅ ํ‚ค ํด๋ž˜์Šค ์„ค์ •
        job.setOutputValueClass(IntWritable.class);  // ์ถœ๋ ฅ ๊ฐ’ ํด๋ž˜์Šค ์„ค์ •
        
        FileInputFormat.addInputPath(job, new Path(args[0]));  // ์ž…๋ ฅ ํŒŒ์ผ ๊ฒฝ๋กœ ์„ค์ •
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  // ์ถœ๋ ฅ ๋””๋ ‰ํ† ๋ฆฌ ๊ฒฝ๋กœ ์„ค์ •
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);  // Job ์‹คํ–‰ ํ›„ ์ข…๋ฃŒ
      }
    }
    
    • ๋” ์ž์„ธํžˆ

       private final static IntWritable one = new IntWritable(1);
      

      ์ด ์ฝ”๋“œ๋Š” IntWritable ํƒ€์ž…์˜ ๋ณ€์ˆ˜ one์„ ์ƒ์„ฑํ•˜๊ณ , ์ด ๋ณ€์ˆ˜์˜ ์ดˆ๊ธฐ๊ฐ’์„ 1๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. IntWritable์€ Hadoop์—์„œ ์‚ฌ์šฉ๋˜๋Š” ์ •์ˆ˜ํ˜• ํƒ€์ž…์ด๋ฉฐ, MapReduce ์ž‘์—…์—์„œ ์ถœ๋ ฅํ•  ๊ฐ’์„ ์ €์žฅํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ์ด ์ฝ”๋“œ๋Š” WordCount ์˜ˆ์ œ์—์„œ Mapper ํด๋ž˜์Šค์—์„œ ์ถœ๋ ฅํ•  ๊ฐ’์˜ ์ดˆ๊ธฐ๊ฐ’์œผ๋กœ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. Mapper ํด๋ž˜์Šค๋Š” ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ํ† ํฐํ™”ํ•˜๊ณ , ๊ฐ ๋‹จ์–ด๋ณ„๋กœ (๋‹จ์–ด, 1)์˜ ํ˜•ํƒœ๋กœ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ด๋•Œ 1์ด๋ผ๋Š” ๊ฐ’์„ IntWritable ํƒ€์ž…์˜ ๋ณ€์ˆ˜ one์— ์ €์žฅํ•˜๊ณ , ์ด ๋ณ€์ˆ˜๋ฅผ ์ถœ๋ ฅ ๊ฐ’์œผ๋กœ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ฆ‰, Mapper์—์„œ ์ƒ์„ฑ๋˜๋Š” ์ถœ๋ ฅ ๊ฐ’์€ (๋‹จ์–ด, one)์˜ ํ˜•ํƒœ๋กœ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.

       // Map ๋ฉ”์„œ๋“œ ๊ตฌํ˜„
          public void map(Object key, Text value, Context context
                          ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());  // ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ํ† ํฐํ™”
            while (itr.hasMoreTokens()) {  // ํ† ํฐ์ด ์กด์žฌํ•˜๋Š” ๋™์•ˆ ๋ฐ˜๋ณต
              word.set(itr.nextToken());  // ํ† ํฐ์„ Text ๊ฐ์ฒด์— ์ €์žฅ
              context.write(word, one);  // ์ถœ๋ ฅ ํ‚ค/๊ฐ’ ์Œ ์ƒ์„ฑ ํ›„ ์ถœ๋ ฅ
            }
          }
      

      ์œ„ ์ฝ”๋“œ๋Š” Mapper ํด๋ž˜์Šค์˜ map ๋ฉ”์„œ๋“œ๋ฅผ ๊ตฌํ˜„ํ•œ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

      ์ด ๋ฉ”์„œ๋“œ๋Š” ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋ฅผ ํ† ํฐํ™”ํ•˜์—ฌ ํ† ํฐ์ด ์กด์žฌํ•˜๋Š” ๋™์•ˆ ํ† ํฐ์„ ํ•˜๋‚˜์”ฉ ๊ฐ€์ ธ์™€์„œ ํ‚ค-๊ฐ’ ์Œ์„ ์ƒ์„ฑํ•˜๊ณ  ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

      ์—ฌ๊ธฐ์„œ๋Š” ์ž…๋ ฅ ๊ฐ’์œผ๋กœ ๋“ค์–ด์˜จ Text ๊ฐ์ฒด๋ฅผ StringTokenizer ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ณต๋ฐฑ์„ ๊ธฐ์ค€์œผ๋กœ ํ† ํฐํ™”ํ•˜๊ณ , while๋ฌธ์„ ์ด์šฉํ•˜์—ฌ ํ† ํฐ์ด ์กด์žฌํ•˜๋Š” ๋™์•ˆ ๋ฐ˜๋ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

      ๊ทธ๋ฆฌ๊ณ  ๊ฐ๊ฐ์˜ ํ† ํฐ์„ word ๋ณ€์ˆ˜์— ์ €์žฅํ•œ ํ›„, context.write ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ถœ๋ ฅ ํ‚ค-๊ฐ’ ์Œ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

      ์ด๋•Œ, ์ถœ๋ ฅ ๊ฐ’์œผ๋กœ๋Š” Mapper ํด๋ž˜์Šค ๋‚ด๋ถ€์— ๋ฏธ๋ฆฌ ์„ ์–ธ๋œ one ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ๋ณ€์ˆ˜๋Š” IntWritable ํด๋ž˜์Šค ํƒ€์ž…์˜ ๊ฐ์ฒด๋กœ, ๊ทธ ๊ฐ’์€ 1๋กœ ์ดˆ๊ธฐํ™”๋ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๋ชจ๋“  ์ถœ๋ ฅ ๊ฐ’์€ 1์ด ๋ฉ๋‹ˆ๋‹ค.

์ด์ œ datanode์— ์ ‘์†ํ•ด์„œ ์•ž์„œ ๋งŒ๋“ค์–ด๋‘” ํŒŒ์ผ์„ HDFS์— ์˜ฌ๋ฆฌ๊ณ  ์ž‘์„ฑํ•ด๋‘” MapReduce ํ”„๋กœ๊ทธ๋žจ์„ ์‹คํ–‰ํ•ด๋ณด๋ ค ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๊ธฐ ์œ„ํ•ด์„  ๋จผ์ € datanode ์ปจํ…Œ์ด๋„ˆ ์•ˆ์— ์ž‘์„ฑํ•ด๋‘” ํŒŒ์ผ์ด ์žˆ๋Š” ์œ„์น˜๋กœ ์ด๋™์„ ํ•ด์•ผํ•ฉ๋‹ˆ๋‹ค.

docker exec -it docker-hadoop-spark-workbench_datanode_1 /bin/bash
cd /hadoop/dfs/data
ls

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-04 แ„‹แ…ฉแ„’แ…ฎ 4.35.01.png

(ํ•„์ž๋Š” ์ด๋ฏธ ํ•œ๋ฒˆ ์‹คํ–‰ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— class ํŒŒ์ผ์ด๋ž‘ jar ํŒŒ์ผ ๋“ฑ ์—ฌ๋Ÿฌ ์žก๋‹คํ•œ ํŒŒ์ผ์ด ๋งŽ์ด ์žˆ์Šต๋‹ˆ๋‹ค..!)

์–ด๋–ค ๋””๋ ‰ํ† ๋ฆฌ์—์„œ๋“  hadoop ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„  java์™€ hadoop ๊ด€๋ จ ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ์„ค์ •ํ•ด์ฃผ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค! ๊ฐ„๋‹จํ•˜๊ฒŒ ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค!

export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

์ด์ œ class ํŒŒ์ผ์„ ๋งŒ๋“ค๊ณ  ์ด๋ฅผ jar ํŒŒ์ผ๋กœ ํŒจํ‚ค์ง• ํ•˜๋Š” ์ž‘์—…๋งŒ ํ•˜๋ฉด ์ค€๋น„๋Š” ๋๋‚ฌ์Šต๋‹ˆ๋‹ค!

hadoop com.sun.tools.javac.Main WordCount.java
jar cf wc.jar WordCount*.class
  • โ€œhadoop com.sun.tools.javac.Mainโ€์€ Hadoop์—์„œ ์ œ๊ณตํ•˜๋Š” ์ž๋ฐ” ์ปดํŒŒ์ผ๋Ÿฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์†Œ์Šค ์ฝ”๋“œ๋ฅผ ์ปดํŒŒ์ผํ•˜๋Š” ๋ช…๋ น์–ด์ž…๋‹ˆ๋‹ค.

  • โ€œWordCount.javaโ€๋Š” ์ปดํŒŒ์ผํ•˜๋ ค๋Š” ์ž๋ฐ” ํŒŒ์ผ์˜ ์ด๋ฆ„์ž…๋‹ˆ๋‹ค. ์ด ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๋ฉด โ€œWordCount.classโ€๋ผ๋Š” ๋ฐ”์ด๋„ˆ๋ฆฌ ํŒŒ์ผ์ด ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค. ์ด ํŒŒ์ผ์€ MapReduce ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ๋ฐ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

์ž! ์ด์ œ HDFS์— ์˜ฌ๋ฆด ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค.

hadoop fs -put ./words.txt /

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-07 แ„‹แ…ฉแ„Œแ…ฅแ†ซ 3.05.57.png

์ดํ›„ ์•ž์„œ ๋งŒ๋“ค์–ด๋‘” jar ํŒŒ์ผ์„ ์‹คํ–‰ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

hadoop jar wc.jar WordCount /words.txt /output

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-07 แ„‹แ…ฉแ„Œแ…ฅแ†ซ 3.03.26.png

โ†’ ์ž‘์—…์ด ์ˆ˜ํ–‰๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค!

output ํŒŒ์ผ์„ ์—ด์–ด๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค!

hadoop fs -ls /output
hadoop fs -cat /output/part-r-00000

แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-07 แ„‹แ…ฉแ„Œแ…ฅแ†ซ 3.02.48.png