本文共享自华为云社区《【实战经验共享】基于Spark的大规模日志剖析【进步小菜猪大数据系列】》,作者:进步小菜猪。

跟着互联网的遍及和运用范围的扩大,越来越多的运用场景需求对海量数据进行高效地处理和剖析,这就要求咱们必须具有大数据技能方面的常识和技能。本篇文章将从一个实践项目动身,共享怎么运用 Spark 进行大规模日志剖析,并经过代码演示加深读者的了解。

1. 数据来历

咱们的项目是针对某购物网站的拜访日志进行剖析,其间主要包含以下几个字段:

  • IP:拜访的客户端 IP 地址
  • Time:拜访时刻
  • Url:拜访的 URL 地址
  • User-Agent:浏览器标识符

原始数据规模约为 100GB,咱们需求对其进行清洗、核算和剖析,以得到有用的信息和价值。

2. 数据清洗

由于原始数据存在缺失值、异常值、重复值等问题,因此咱们需求进行数据清洗,主要包含以下过程:

  1. 将原始数据进行格式转化,便利后续处理
  2. 对 IP、Time、Url 和 User-Agent 字段进行解析和提取
  3. 去除不合法的记载和重复的记载

详细代码完成如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Locale
​
object DataCleaning {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataCleaning")
val sc = new SparkContext(conf)
val data = sc.textFile("hdfs://master:9000/log/access.log")
​
// 界说时刻格式及区域信息
val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
​
// 数据清洗
val cleanData = data.map(line => {
val arr = line.split(" ")
if (arr.length >= 9) {
// 解析 IP
val ip = arr(0)
​
// 解析时刻,转化为 Unix 时刻戳
val time = dateFormat.parse(arr(3) + " " + arr(4)).getTime / 1000// 解析 URL
val url = urlDecode(arr(6))
​
// 解析 UserAgent
val ua = arr(8)
​
(ip, time, url, ua)
}
}).filter(x => x != null).distinct()
​
// 成果输出
cleanData.saveAsTextFile("hdfs://master:9000/cleanData")
​
sc.stop()
}
​
// URL 解码
def urlDecode(url: String): String = {
java.net.URLDecoder.decode(url, "utf-8")
}
}

3. 数据核算

对于大规模数据的处理,咱们能够运用 Spark 供给的强壮的分布式核算才能,以进步处理效率和减少核算时刻。

咱们这里运用 Spark SQL 核算每个 URL 的拜访量,并输出前 10 个拜访量最高的 URL,代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
​
case class LogRecord(ip: String, time: Long, url: String, ua: String)
​
object DataAnalysis {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataAnalysis")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
​
// 读取清洗后的数据
val cleanData = sc.textFile("hdfs://master:9000/cleanData").filter(x => x != null)
​
// 将数据转化为 DataFrame
import sqlContext.implicits._
val logDF = cleanData.map(_.split(",")).map(p => LogRecord(p(0), p(1).toLong, p(2), p(3))).toDF()
​
// 核算每个 URL 的拜访量,并按拜访量降序排序
val topUrls = logDF.groupBy("url").count().sort($"count".desc)
​
// 输出前 10 个拜访量最高的 URL
topUrls.take(10).foreach(println)
​
sc.stop()
}
}

4. 数据可视化

数据可视化是将处理和剖析后的数据以图表或图画的方法展示出来,有利于咱们直观地调查数据的规则和趋势。

咱们这里采用 Python 的 Matplotlib 库将前 10 个拜访量最高的 URL 可视化,代码如下:

import matplotlib.pyplot as plt
​
# 读取数据
with open('topUrls.txt', 'r') as f:
line = f.readline()
urls = []
counts = []
while line and len(urls) < 10:
url, count = line.strip().split(',')
urls.append(url)
counts.append(int(count))
line = f.readline()
# 制作直方图
plt.bar(range(10), counts, align='center')
plt.xticks(range(10), urls, rotation=90)
plt.xlabel('Url')
plt.ylabel('Count')
plt.title('Top 10 Url')
plt.show()

在进行数据清洗前,需求先对原始日志数据进行挑选,选取需求剖析的字段。然后进行数据清洗,去掉不必要的空格、特殊字符等,使数据愈加规整,并增加可读性。

下面是数据清洗的代码示例:

val originalRdd = spark.sparkContext.textFile("path/to/logfile")
​
val filteredRdd = originalRdd.filter(line => {
val tokens = line.split("\t")
tokens.length >= 10 &&
tokens(0).matches("\d{4}-\d{2}-\d{2}") &&
tokens(1).matches("\d{2}:\d{2}:\d{2}") &&
tokens(2).matches("\d+") &&
tokens(3).matches("\d+") &&
tokens(4).matches("\d+") &&
tokens(5).matches("\d+") &&
tokens(6).matches(".+") &&
tokens(7).matches(".+") &&
tokens(8).matches(".+") &&
tokens(9).matches(".+")
})
​
val cleanedRdd = filteredRdd.map(line => {
val tokens = line.split("\t")
val timestamp = s"${tokens(0)} ${tokens(1)}"
val request = tokens(6).replaceAll(""", "")
val responseCode = tokens(8).toInt
(timestamp, request, responseCode)
})

在上述代码中,咱们首先读取原始日志数据,并运用filter函数过滤掉不符合条件的行;然后运用map函数将数据转化为元组的方式,并进行清洗。其间,元组的三个元素分别是时刻戳、恳求内容和呼应状况码。

接下来,让咱们来介绍一下怎么运用Spark进行数据核算。

数据核算是大规模数据剖析中非常重要的一个环节。Spark供给了丰富的聚合函数,可用于对数据进行各种核算剖析。

下面是对清洗后的数据进行核算剖析的代码示例:

import org.apache.spark.sql.functions._
​
val df = spark.createDataFrame(cleanedRdd).toDF("timestamp", "request", "responseCode")
val totalCount = df.count()
val errorsCount = df.filter(col("responseCode") >= 400).count()
val successCount = totalCount - errorsCount
val topEndpoints = df.groupBy("request").count().orderBy(desc("count")).limit(10)
topEndpoints.show()

在上面的代码中,咱们首先将清洗后的数据转化为DataFrame,然后运用count函数核算总记载数和过错记载数,并核算成功记载数。最后运用groupBy和orderBy函数按照恳求内容,对数据进行分组核算,并打印出恳求次数最多的前10个端点。

经过可视化,咱们能够清楚地看到前 10 个拜访量最高的 URL 地址及其拜访量,这对于进一步剖析和优化网站的性能和用户体验具有重要的含义。

总结起来,这就是咱们的一个大数据实战项目,咱们运用 Spark 核算了购物网站的拜访量,并经过 Python 的 Matplotlib 库将成果可视化。这个过程中,咱们运用了数据清洗、Spark SQL 核算和可视化等技能,为大规模数据的处理和剖析供给了有效的解决方案。

点击关注,第一时刻了解华为云新鲜技能~