public static class Map extends Mapper<Object, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
@Override
protected void setup(Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Path smallTablePath = new Path(context.getConfiguration().get("smallTableLocation"));
FileSystem hdfs = smallTablePath.getFileSystem(context.getConfiguration());
FSDataInputStream hdfsReader = hdfs.open(smallTablePath);
Text line = new Text();
LineReader lineReader = new LineReader(hdfsReader);
while(lineReader.readLine(line) > 0) {
String[] values = line.toString().split("\n");
for (int i = 0; i < values.length; i++) {
cacheList.add(values[i]);
}
}
lineReader.close();
hdfsReader.close();
System.out.println("setup ok");
}
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String row_matrix1 =value.toString().split(" ")[0];
String[] column_value_array_matrix1 =value.toString().split(" ")[1].split(",");
for(String line:cacheList) {
String row_matrix2 = line.toString().split("\t")[0];
String[] column_value_array_matrix2 = line.toString().split("\t")[1].split(",");
int result= 0;
for(String column_value_matrix1:column_value_array_matrix1) {
String column_matrix1 = column_value_matrix1.split("_")[0];
String value_matrix1 = column_value_matrix1.split("_")[1];
for(String column_value_matrix2:column_value_array_matrix2) {
if(column_value_matrix2.startsWith(column_matrix1+"_")) {
String value_matrix2 = column_value_matrix2.split("_")[1];
result +=Integer.valueOf(value_matrix1)*Integer.valueOf(value_matrix2);
}
}
}
outKey.set(row_matrix1);
outValue.set(row_matrix2+"_"+result);
System.out.println(outValue);
context.write(outKey, outValue);
}
}
}