mapreduce 跑python代码过程

it2022-05-05  63

记得在代码顶部加上python的环境路径  (#!/usr/bin/python)  linux环境下需要找到python解释器

1.  chmod -R 777 x.py      给代码文件赋予权限

2.  chown 用户(hadoop)  x.py  给代码转换用户

3.  chgrp  用户(hadoop)  x.py  给代码转换所属组

4.  vi  x.py  打开代码 输入" : "  再输入 set ff=unix  改变py文件的保存方式,否则linux可能不识别windows的默认保存方式

5.  Mapreduce 运行py文件命令:

  (1)先找到 streaming的jar包  (貌似是用来转换其他语言为java代码)  路径在 $HADOOP_PATH/share/hadoop/tools/lib/hadoop-streaming-x.jar  (目前高版本路径,老版本不是这个路径)

  (2)输入命令:

    hadoop jar hadoop-streaming-x.jar -file ....../map.py -mapper ....../map.py -file ....../reduce.py -reducer ....../reduce.py -input /.../data.txt (hdfs路径)  -output  /.../out

 map reduce 实例代码:

map:

#!/usr/bin/python import sys for line in sys.stdin: line.strip() data = line.split('\t') # print(data) priarea = data[0] date = data[4] time = 15 * 96 user = data[3] print('%s %s %s %s 1'%(priarea,date,user,time))

reduce:

#!/usr/bin/python import sys out = {} for line in sys.stdin: line = line.strip() data = line.split(' ') # print(data) priarea = data[0] date = data[1] time = data[3] mykey = '%s_%s'%(priarea,date) # print(key) if mykey in out.keys(): out[mykey]['sum_time'] = float(out[mykey]['sum_time']) + float(time) out[mykey]['user_count'] = float(out[mykey]['user_count']) + 1 else: out[mykey] = {'sum_time':time,'user_count':1.0} for key in out.keys(): key_list = key.split('_') priarea = key_list[0] date = key_list[1] print('%s %s %s %s' % (priarea, date, out[key]['user_count'], out[key]['sum_time']))

 

  

转载于:https://www.cnblogs.com/cxhzy/p/11195687.html


最新回复(0)