zeun ParkJieun's Dev Log data-engineer & backend developer

Hadoop - MapReduce

» hadoop

하둡 완벽 가이드: 2장 맵리듀스

맵리듀스란?

디스크에서 데이터를 읽고 쓰는 문제를 키-값 쌍의 계산으로 변환한 추상화 된 프로그래밍 모델

대표적인 대용량 데이터 처리를 위한 병렬 처리 기법

mapreduce

맵리듀스를 활용하여 기상 데이터 셋 분석하기

목적 및 데이터 포맷

연도별 전 세계 최고 기온 구하기

gzip 압축 파일, 구분자 없이 필드가 한 행으로 붙어 있음

유닉스 도구로 데이터 분석 하기

for year in all/*
do
	echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
	awk '{ temp = substr($0, 88, 5) + 0;
				 q = substr($0, 93, 1);
				 if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
			END { print max }'
done

전체 데이터 EC2 고성능 인스턴스에서 실행해 본 결과 42분이 걸림

하둡으로 데이터 분석하기

1) 맵 단계

입력: 원본 데이터

맵 함수 역할: 각 행에서 연도와 기온을 추출, 잘못된 레코드를 걸러주는 작업

출력: 연도와 기온 (ex) (1950, 0) (1950, 22) (1951, 27) …

key : 연도

value : 기온

2) 리듀스 단계

입력: 연도별로 측정된 모든 기온 값(리스트 형식) (ex) (1949, [111,78,22,24]) (1950, [0,22,-5]) …

리듀스 함수 역할: 리스트 전체를 반복해 최고 기온 값을 추출

출력: 연도와 최고 기온 (ex) (1949, 111) ( 1950, 22) …

자바 맵리듀스

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureMapper extends Mapper<T> {
	//Mapper : 4개의 정규 타입 매개변수를 가짐(입력키, 입력값, 출력키, 출력값)
	@Override
	public void map(LongWritable key, Text value, Context context) {
		...
		//출력을 위한 context의 인스턴스 제공
		context.write(new Text(year), new IntWritable(airTemperature));
	}
}

public class MaxTemperatureReducer extends Reducer<T> {
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context) {
		//리듀스의 입력타입은 맵함수의 출력타입인 Text와 IntWritable
		...
		context.write(key, new IntWritable(maxValue));
	}
}

자바 맵리듀스 잡을 구동하는 코드

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
	public static void main(String[] args) throws Exception {

		//Job 객체는 잡 명세서를 작성 함
		//JAR 파일로 묶으면 하둡은 클러스터의 해당 머신에 JAR 파일을 배포 함
		Job job = new Job();
		job.setJarByClass(MaxTemperature.class);
		job.setJobName("Max Temperature");

		//입출력 경로 지정
		...

		//맵, 리듀스 입출력 데이터 타입 지정
		...

		// 잡이 끝날 때까지 기다리며 진척 상황을 콘솔로 보고
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}

실행 로그

export HADOOP_CLASSPATH=hadoop-examples.jar
hadoop MaxTemperature *.txt output

m_0000_0 ⇒ map job // r_0000_0 ⇒ reduce job

매퍼에 있어 정상적인 입력 레코드는 한번에 하나의 출력 레코드를 생성함

리듀스 당 하나의 출력 파일이 생성 됨

분산형으로 확장하기

데이터 흐름

맵리듀스 잡: 클라이언트가 수행하는 작업의 기본 단위

하둡은 잡을 맵 태스크, 리듀스 태스크로 나누어 실행함

전체 데이터를 hdfs에 저장

각 태스크는 YARN을 이용하여 스케줄링되고 클러스터의 여러 노드에서 실행 됨.

Q) 실행중인 특정 노드의 태스크가 실패된다면?

A) 자동으로 다른 노드를 재할당하여 다시 실행 됨.

스플릿

잡의 입력을 고정크기 조각으로 분리하는 것

스플릿을 하는 이유

전체 입력을 통으로 처리하는 것 보다 스플릿으로 분리된 조각을 각각 처리하는 것이 훨씬 빠름 ⇒ 너무 작으면 스플릿 관리, 맵태스크 생성을 위한 오버헤드가 발생함 ⇒ 적절한 크기는 HDFS 블록의 기본 크기(클러스터의 설정에 따라 다름)

데이터 지역성 최적화

  • 입력데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동 함
  • 맵 태스크의 입력 스플릿에 해당하는 HDFS 블록 복제본이 저장된 세 개의 노드 모두 다른 맵 태스크를 실행하여 여유가 없을 경우 블록 복제본이 저장된 동일 랙에 속한 다른 노드에서 가용한 맵 슬롯을 찾음
  • 데이터 복제본이 저장된 노드가 없는 외부 랙의 노드가 선택될 경우 랙 간에 네트워크 전송이 불가피하게 일어남

리듀스태스크는 모든 맵의 결과를 입력으로 받으므로 데이터 지역성의 장점이 없다

rack

태스크 결과

Map

  • 로컬에 저장함
  • 중간결과물로서 잡이 완료되면 삭제 됨

Reduce

  • Hdfs에 저장함
  • hdfs블록의 첫번째 복제본은 로컬노드에 저장, 나머지 복제본은 외부 랙에 저장

리듀스 태스크가 하나일 경우

reduce

리듀스 태스크가 여럿일 경우

reduce

리듀스가 여럿이면 맵 태스크는 리듀스 수만큼 파티션을 생성하고 맵의 결과를 각 파티션에 분배.

셔플이 일어남 ⇒ 리듀스의 수를 선택하는 것은 잡의 실행 시간에 영향을 미치므로 튜닝이 필요

분산 맵리듀스 잡

EC2 10개 노드 ⇒ 6분

하둡 스트리밍

하둡은 자바 외에 다른 언어로 맵과 리듀스 함수를 작성할 수 있는 맵리듀스 API를 제공함

파이썬 코드 (맵 함수)

import re
import sys

for line in sys.stdin:
	val = line.strip()
	(year, temp, q) = (val[15:19], val[87:92], val[92,93])
	if (temp != "+9999" and re.match("[01459]", q):
		print "%s\t%s" % (year, temp)

파이썬 코드 (리듀스 함수)

import sys

(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
	(key, val) = line.strip().split("\t")
	if last_key and last_key != key:
		print "%s\t%s" % (last_key, max_val)
		(last_key, max_val) = (key, int(val))
	else:
		(last_key, max_val) = (key, max(max_val, int(val)))

if last_key:
	print "%s\t%s" % (last_key, max_val)