目录
- 实现代码
- 实现思路
- 阶段一:部分乘积计算
- 阶段二:聚合求和
- 输入格式要求
- 运行流程
- 注意事项
- 关键概念图解
- 具体例子详细跟踪程序的执行过程
- 输入文件格式
- 阶段一执行过程
- Mapper处理流程
- Reducer处理过程
- 阶段一最终输出
- 阶段二执行过程
- Mapper处理(直接传递)
- Reducer聚合求和
- 最终输出结果
- 可视化数据流向
实现代码
使用Hadoop的MapReduce实现矩阵相乘是可行的,但需要合理设计数据分发和处理逻辑。以下是基于两阶段MapReduce的实现思路:
实现思路
- 阶段一:计算每个k对应的部分乘积。
- 阶段二:聚合所有部分乘积,得到最终结果矩阵。
阶段一:部分乘积计算
Mapper:
- 处理矩阵A的元素,输出键为列号k,值为(
A
, 行号i, 值)。 - 处理矩阵B的元素,输出键为行号k,值为(
B
, 列号j, 值)。
Reducer:
- 对每个k,收集所有A的行i的值和B的列j的值。
- 计算笛卡尔积,输出键为(i,j),值为A[i][k] * B[k][j]。
示例代码:
/*** 第一阶段Mapper:负责分发矩阵元素到Reducer*/
public class Phase1Mapper extends Mapper<Object, Text, Text, Text> {// 输入值格式示例:// 矩阵A元素:A,行号i,列号k,值 如 "A,2,3,5.0"// 矩阵B元素:B,行号k,列号j,值 如 "B,3,4,2.0"public void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 将输入文本按逗号分割String[] tokens = value.toString().split(",");String matrix = tokens[0]; // 获取矩阵标识(A或B)if (matrix.equals("A")) {// 处理矩阵A的元素int i = Integer.parseInt(tokens[1]); // 行号iint k = Integer.parseInt(tokens[2]); // 列号k(即中间维度)double aVal = Double.parseDouble(tokens[3]); // 元素值// 输出键为k,值为"A,行号i,值"(通过k进行分组)// 示例:key="3", value="A,2,5.0"context.write(new Text(String.valueOf(k)), new Text("A," + i + "," + aVal));} else if (matrix.equals("B")) {// 处理矩阵B的元素int k = Integer.parseInt(tokens[1]); // 行号k(即中间维度)int j = Integer.parseInt(tokens[2]); // 列号jdouble bVal = Double.parseDouble(tokens[3]); // 元素值// 输出键为k,值为"B,列号j,值"// 示例:key="3", value="B,4,2.0"context.write(new Text(String.valueOf(k)), new Text("B," + j + "," + bVal));}}
}/*** 第一阶段Reducer:计算部分乘积*/
public class Phase1Reducer extends Reducer<Text, Text, Text, DoubleWritable> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 存储来自矩阵A和B的元素List<String> aEntries = new ArrayList<>(); // 格式:i,valueList<String> bEntries = new ArrayList<>(); // 格式:j,value// 分类处理来自不同矩阵的值for (Text val : values) {String[] parts = val.toString().split(",");if (parts[0].equals("A")) {// A矩阵元素:格式为 A,i,valueaEntries.add(parts[1] + "," + parts[2]); // 保存i和值} else {// B矩阵元素:格式为 B,j,valuebEntries.add(parts[1] + "," + parts[2]); // 保存j和值}}// 计算笛卡尔积(所有i和j的组合)for (String a : aEntries) {String[] aParts = a.split(",");int i = Integer.parseInt(aParts[0]); // 矩阵A的行号double aVal = Double.parseDouble(aParts[1]); // 矩阵A的值for (String b : bEntries) {String[] bParts = b.split(",");int j = Integer.parseInt(bParts[0]); // 矩阵B的列号double bVal = Double.parseDouble(bParts[1]); // 矩阵B的值// 计算乘积并输出到第二阶段double product = aVal * bVal;// 输出键为(i,j),值为乘积结果// 示例:key="2,4", value=10.0context.write(new Text(i + "," + j), new DoubleWritable(product));}}}
}
阶段二:聚合求和
Mapper:直接传递键值对。
Reducer:对相同(i,j)键的值求和。
示例代码:
/*** 第二阶段Mapper(恒等映射):直接传递键值对* 输入格式:i,j productValue*/
public class Phase2Mapper extends Mapper<Object, Text, Text, DoubleWritable> {public void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 将输入行分割为键值对(格式:i,j\tproduct)String[] parts = value.toString().split("\t");String outputKey = parts[0]; // 保持键不变double outputValue = Double.parseDouble(parts[1]);// 直接传递键值对context.write(new Text(outputKey), new DoubleWritable(outputValue));}
}/*** 第二阶段Reducer:求和得到最终结果*/
public class Phase2Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {public void reduce(Text key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {double sum = 0.0;// 对相同(i,j)的所有部分乘积求和for (DoubleWritable val : values) {sum += val.get(); // 累加值}// 输出最终结果context.write(key, new DoubleWritable(sum));}
}
输入格式要求
// 矩阵A的每个元素需要按以下格式存储:
A,i,k,value
// 例如:A,0,2,3.14 表示A[0][2] = 3.14// 矩阵B的每个元素需要按以下格式存储:
B,k,j,value
// 例如:B,2,5,6.28 表示B[2][5] = 6.28// 文件示例:
A,0,0,1.0
A,0,1,2.0
B,0,0,3.0
B,1,0,4.0
运行流程
- 输入格式:矩阵A和B以特定格式存储,如
A,i,k,value
和B,k,j,value
。 - 执行阶段一:计算每个k对应的部分乘积。
- 执行阶段二:对部分乘积求和,得到最终结果。
注意事项
- 性能问题:阶段一的笛卡尔积可能导致中间数据量爆炸,适用于小规模矩阵或稀疏矩阵。
- 优化:可通过矩阵分块、Combiner优化或调整数据分发策略来减少数据传输。
关键概念图解
矩阵相乘原理:
C[i][j] = Σ(A[i][k] * B[k][j]) ,对k从1到n求和MapReduce数据流:
矩阵A元素 (i,k) → 通过k分发 → Reducer
矩阵B元素 (k,j) → 通过k分发 → Reducer
Reducer内计算所有i和j的组合乘积阶段一输出:
(0,0) → 1 * 3 + 2 * 4 = 11
(0,1) → 1 * 5 + 2 * 6 = 17
...(其他元素同理)阶段二输出:
(0,0) 11
(0,1) 17
...(最终结果矩阵)
通过上述方法,可以利用Hadoop的MapReduce实现矩阵相乘,虽然存在性能限制,但理论上是可行的。
具体例子详细跟踪程序的执行过程
让我们通过一个具体的2x2矩阵相乘示例,详细跟踪程序的执行过程。假设:
矩阵A:
[1 2]
[3 4]
矩阵B:
[5 6]
[7 8]
最终结果C = A×B 应该是:
[1×5+2×7=19, 1×6+2×8=22]
[3×5+4×7=43, 3×6+4×8=50]
输入文件格式
文件内容(假设存储为input/matrix.txt
):
A,0,0,1.0
A,0,1,2.0
A,1,0,3.0
A,1,1,4.0
B,0,0,5.0
B,0,1,6.0
B,1,0,7.0
B,1,1,8.0
阶段一执行过程
Mapper处理流程
每个Mapper会处理一行输入:
输入行 | 输出键 | 输出值 | 说明 |
---|---|---|---|
A,0,0,1.0 |
"0" | "A,0,1.0" | 矩阵A第0行第0列元素 |
A,0,1,2.0 |
"1" | "A,0,2.0" | 矩阵A第0行第1列元素 |
A,1,0,3.0 |
"0" | "A,1,3.0" | 矩阵A第1行第0列元素 |
A,1,1,4.0 |
"1" | "A,1,4.0" | 矩阵A第1行第1列元素 |
B,0,0,5.0 |
"0" | "B,0,5.0" | 矩阵B第0行第0列元素 |
B,0,1,6.0 |
"0" | "B,1,6.0" | 矩阵B第0行第1列元素 |
B,1,0,7.0 |
"1" | "B,0,7.0" | 矩阵B第1行第0列元素 |
B,1,1,8.0 |
"1" | "B,1,8.0" | 矩阵B第1行第1列元素 |
Reducer处理过程
按key分组后:
Reducer处理key="0"(k=0)时:
- 收到的values列表:
["A,0,1.0", "A,1,3.0", "B,0,5.0", "B,1,6.0"]
- 分类存储:
aEntries = ["0,1.0", "1,3.0"] // 矩阵A第0列元素(即k=0) bEntries = ["0,5.0", "1,6.0"] // 矩阵B第0行元素(即k=0)
- 计算笛卡尔积:
(0,0) → 1.0*5.0 = 5.0 (0,1) → 1.0*6.0 = 6.0 (1,0) → 3.0*5.0 = 15.0 (1,1) → 3.0*6.0 = 18.0
Reducer处理key="1"(k=1)时:
- 收到的values列表:
["A,0,2.0", "A,1,4.0", "B,0,7.0", "B,1,8.0"]
- 分类存储:
aEntries = ["0,2.0", "1,4.0"] // 矩阵A第1列元素(k=1) bEntries = ["0,7.0", "1,8.0"] // 矩阵B第1行元素(k=1)
- 计算笛卡尔积:
(0,0) → 2.0*7.0 = 14.0 (0,1) → 2.0*8.0 = 16.0 (1,0) → 4.0*7.0 = 28.0 (1,1) → 4.0*8.0 = 32.0
阶段一最终输出
0,0 → 5.0
0,1 → 6.0
1,0 → 15.0
1,1 → 18.0
0,0 → 14.0
0,1 → 16.0
1,0 → 28.0
1,1 → 32.0
阶段二执行过程
Mapper处理(直接传递)
输入来自阶段一的输出,Mapper不做处理,直接传递:
(0,0) → 5.0
(0,1) → 6.0
(1,0) → 15.0
...(其他值同理)
Reducer聚合求和
按(i,j)分组求和:
处理(0,0):
- 值列表:[5.0, 14.0]
- 求和:5.0 + 14.0 = 19.0
处理(0,1):
- 值列表:[6.0, 16.0]
- 求和:6.0 + 16.0 = 22.0
处理(1,0):
- 值列表:[15.0, 28.0]
- 求和:15.0 + 28.0 = 43.0
处理(1,1):
- 值列表:[18.0, 32.0]
- 求和:18.0 + 32.0 = 50.0
最终输出结果
0,0 → 19.0
0,1 → 22.0
1,0 → 43.0
1,1 → 50.0
可视化数据流向
矩阵A元素分布:
k=0 → A(0,0)=1.0, A(1,0)=3.0
k=1 → A(0,1)=2.0, A(1,1)=4.0矩阵B元素分布:
k=0 → B(0,0)=5.0, B(0,1)=6.0
k=1 → B(1,0)=7.0, B(1,1)=8.0阶段一Reducer处理:
k=0时:(i=0) * (j=0) → 1*5=5(i=0) * (j=1) → 1*6=6(i=1) * (j=0) → 3*5=15(i=1) * (j=1) → 3*6=18k=1时:(i=0) * (j=0) → 2*7=14(i=0) * (j=1) → 2*8=16(i=1) * (j=0) → 4*7=28(i=1) * (j=1) → 4*8=32阶段二聚合:
C[0][0] = 5 + 14 = 19
C[0][1] = 6 + 16 = 22
C[1][0] = 15 + 28 = 43
C[1][1] = 18 + 32 = 50
通过这个具体案例,可以清晰看到:
- 数据如何通过
k
进行分组 - Reducer如何计算部分乘积
- 两阶段MapReduce如何协作完成矩阵相乘