MapReduce for Python   2015-01-30


我们可以用 hadoop-streaming 的方式,通过 python 等其他语言来编写 MR 程序.

Map阶段:mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print("%s" % word)

# 这里仅仅是一个例子,只输出了第一列

为了是脚本可执行,增加mapper.py的可执行权限

当然,Map阶段, 你也可以不作处理原样输出: 只写一个 cat

Reduce阶段:reducer.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2013 x Inc. All Rights Reserved

__author__ = 'Blair Chan'

import sys
import constant

from datetime import datetime
from EsHelper import EsHelper

def insert_user_basic_consume_info(items, esHelper):

basic_consume_info_doc = get_user_basic_consume_info_doc(items)
if basic_consume_info_doc is None:
return

_id = basic_consume_info_doc['mobile_number']
basic_consume_info_index = "basic_consume_info_index"

esHelper.index(index=basic_consume_info_index, doc_type=basic_consume_info_index, id=_id, data=basic_consume_info_doc)

def get_user_basic_consume_info_doc(items):
doc = None
try:
doc = {
"mobile_number": items[0],
"first_consume_time": items[1]
}
except BaseException as e:
print("Exist Exception : %s About get_user_basic_consume_info_doc, mobile_number: %s" % (str(e), mobile_number))
finally:
pass

return doc

def main():

esHelper = EsHelper(constant.ES_URL)
success_sum = 0

for line in sys.stdin:

line = line.strip()
items = line.split('\001')

if len(items) < 2:
continue

insert_user_basic_consume_info(items, esHelper)
success_sum = success_sum + 1

print("Success:%d" % success_sum)


if __name__ == '__main__':
main()

本地测试

cat data.txt | python mapper.py | sort | reducer.py

提交Hadoop

#!/bin/bash
cd `dirname $0`/.. && wk_dir=`pwd` && cd -
source ${wk_dir}/util/env

input_file="${OSS_URL}/${mds_hive_dir}/${table_user_basic_consume_info}/*"
output_file="${OSS_URL}/${tmp_hive_dir}/${table_user_basic_consume_info}/dt=${d1}"
reducer="reducer.py"
reducer_depend1="constant.py"
reducer_depend2="EsHelper.py"
archive="${OSS_URL}/share/packages/elasticsearch-5.0.0.tar.gz#elasticsearch-5.0.0"
## archive 表示的依赖包需要上传到 hdfs 上,#后面表示的是解压后的目录名

${HADOOP} fs -rmr ${output_file}

cmd="${HADOOP} jar ${hadoop_streaming_jar}
-D mapred.map.tasks=100
-D mapred.reduce.tasks=100
-D stream.map.input.ignoreKey=true
-input ${input_file}
-output ${output_file}
-file ${reducer}
-file ${reducer_depend1}
-file ${reducer_depend2}
-mapper cat
-reducer ${reducer}
-cacheArchive ${archive}"


echo_ex "$cmd"
$cmd
check_success

hadoop_streaming_jar=”/home/data_mining/share/packages/hadoop2/hadoop-streaming-2.7.2.jar”

以上仅仅是一个例子,虽然插入 ES 出现异常,但本篇仅仅说明如何用 python 写 mapreduce 程序

Reference


分享到:


  如果您觉得这篇文章对您的学习很有帮助, 请您也分享它, 让它能再次帮助到更多的需要学习的人. 您的支持将鼓励我继续创作 !
本文基于署名4.0国际许可协议发布,转载请保留本文署名和文章链接。 如您有任何授权方面的协商,请邮件联系我。

Contents

  1. Map阶段:mapper.py
  2. Reduce阶段:reducer.py
  3. 本地测试
  4. 提交Hadoop
  5. Reference