今天学习Spark去重。
学习时长2小时
import org.apache.spark.sql.{SparkSession}
object MergeAndDeduplicate {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("Merge and Deduplicate")
.master("local[*]")
.getOrCreate()
// 设置日志级别
spark.sparkContext.setLogLevel("WARN")
// 检查输入和输出路径参数
if (args.length != 3) {
println("Usage: MergeAndDeduplicate <inputPathA> <inputPathB> <outputPath>")
sys.exit(1)
}
val inputPathA = args(0)
val inputPathB = args(1)
val outputPath = args(2)
// 读取输入文件A和B
val fileA = spark.read.textFile(inputPathA)
val fileB = spark.read.textFile(inputPathB)
// 解析文件内容,分割为键值对
val dataA = fileA.map(line => {
val parts = line.split("\t")
(parts(0), parts(1))
})
val dataB = fileB.map(line => {
val parts = line.split("\t")
(parts(0), parts(1))
})
// 合并两个数据集并剔除重复内容
val combinedData = dataA.union(dataB).distinct()
// 转换为字符串格式以便保存到文件
val outputData = combinedData.map {
case (date, value) => s"$date\t$value"
}
// 保存结果到输出路径
outputData.write.text(outputPath)
// 关闭SparkSession
spark.stop()
}
}