一、什么是 DGA 域名
DGA 是 Domain Generation Algorithm(域名生成算法)的简称,是指使用主控端和被控端协商好的一种基于随机算法的域名生成协议,简单来说就是生成一个随机字符串来作为域名并进行注册,将其作为 C&C 服务器的域名并不定时经常性更换。由于具备强随机性,短时效性,通过 DGA 生成的域名往往在查杀上更具被难度。
1.1 DGA 的使用场景
攻击者攻陷服务器后,需要和 C&C(Control & Command)服务器通信来控制服务器并下发命令,从而执行任务。通信通过域名进行,如果攻击者使用的是一个固定的域名来通信,运维人员和容易将这些域名加入黑名单进行拦截。为了绕过黑名单,攻击者在被控机和下发命令的设备上,使用同一个 DGA 算法生成随机域名来进行通信。
二、分类算法 SVM
2.1 基本概念
SVM 全称支持向量机(Support Vector Machine)是一种强大的监督学习算法,主要用于分类和回归分析。其核心思想是找到一个最优的决策边界(超平面),使得不同类别之间的间隔最大化。
2.2 核心原理
2.2.1 线性可分情况
举一个例子:
# 二维空间中的简单示例
import numpy as np
import matplotlib.pyplot as plt
# 假设我们有两类数据
class1 = np.array([[1, 2], [2, 3], [3, 3]]) # 正类
class2 = np.array([[3, 1], [4, 2], [5, 1]]) # 负类
SVM 的目标是找到一条直线 w·x + b = 0,使得 margin = 2/‖w‖ 最大化。
数学表达:
决策函数:$f(x) = w^T x + b$
分类规则:
$w^T x + b geq 1$ 时,预测为正类
$w^T x + b leq -1$ 时,预测为负类
2.2.2 支持向量
支持向量是距离决策边界最近的那些数据点,它们“支撑”着整个分类边界。SVM 的训练结果只依赖于这些关键的支持向量。
2.3 Spark MLlib 中的 SVM 实现
2.3.1 LinearSVC 类
import org.apache.spark.ml.classification.LinearSVC
val svm = new LinearSVC()
.setMaxIter(100) // 最大迭代次数
.setRegParam(0.1) // 正则化参数 (1/C)
.setTol(1e-6) // 收敛容忍度
.setFitIntercept(true) // 是否拟合截距项
.setStandardization(true) // 是否标准化特征
2.4 SVM 的优势与局限
2.4.1 优势
理论完备:基于统计学习理论的坚实数学基础;全局最优:凸优化问题,保证找到全局最优解;泛化能力强:最大化间隔提高泛化性能;核技巧:可处理非线性核问题;稀疏性:决策只依赖于支持向量;
2.4.2 局限
计算复杂度:大规模数据训练较慢;参数敏感:核函数和参数选择影响很大;概率输出:不直接提供概率估计只支持二分类:要多分类需要构造多个二分类器;
三、Spark MLlib 使用 SVM 基于域名字符串检测 DGA
在 DGA 域名检测中,SVM 特别适合。因为:
特征明确:域名长度、熵值、字符分布等特征清晰;二分类问题:只用区分正常域名和 DGA 域名;边界清晰:两类域名在特征空间中有较好分离性;
3.1 整体应用类
3.1.1 创建 object 并配置 Spark 环境
val spark = SparkSession
.builder
.appName("DgaAnalyseApplication")
.getOrCreate()
3.1.2 从 Kafka 消费数据并封装为 Bean 对象
//使用 DataSet
import spark.implicits._
//sql 函数
import org.apache.spark.sql.functions._
val kafkaDS: Dataset[String] = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", KafKaConfig.BROKERLIST)
.option("group.id", KafkaConfig.DGA_ANALYSE_GROUP_ID)
.option("failOnDataLoss", "false")
.option("auto.offset.reset", "latest")
.option("subscribe", KafkaConfig.DNS_ETL_TOPIC)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val dnsDs: Dataset[DnsData] = kafkaDS.map(jsonStr => {
try {
if (StringUtils.isNotBlank(jsonStr)) {
val dnsData = JSON.parseObject(jsonStr, classOf[DnsData])
dnsData
}else{
null
}
} catch {
case ex: Exception => ex.printStackTrace()
null
}
}).filter(x => x!=null)
3.1.3 提取特征
//注册特征提取 UDF 函数
val extractFeatureUDF: UserDefinedFunction = functions.udf(ExtractFeature.extractFeature _)
//提取特征
val parsedData:DataFrame = dnsDsFilted.withColumn("domain",$"dns_query").withColumn("rawFeature",extractFeatureUDF($"domain"))
特征提取 UDF 函数
object ExtractFeature {
// 特征1:点占长度的比例
// 特征2:元音占长度的比例
// 特征3:字符集占长度的比例
// 特征4:数字所占的比例
// 特征5:生成概率归一化
// 特征6:长度归一化
// 特征7:隔符内数字个数的最大值
// 特征8:数字字母转换的频率
def extractFeature(raw_domain:String): linalg.Vector = {
//改大写 去掉"www."
val domain = domainTransform(raw_domain)
var counter = Map.empty[Char, Int]
val urllen=domain.length
val urlchartset=scala.collection.mutable.Set.empty[Char]
var featurelist=List.empty[Double]
for (k <- DNSConfig.charsetlist){
counter+=(k->0)
}
counter+=(' '->0)
for (s <- domain.toArray) {
val a=s.toUpper
if (counter.contains(a))
counter += (a -> (counter(a) + 1))
else
counter += (a -> (counter(' ') + 1))
urlchartset+=(a)
}
var maxnumdigit=0
val sp=domain.split("\.", -1)
for (s<-sp){
var numdigit=0
for (a <- s) {
if (a.isDigit){
numdigit+=1
}
}
maxnumdigit=math.max(maxnumdigit,numdigit)
}
var switchfrequency = 0.0;
for ( i <- 1 to (domain.length - 1)) {
if (domain(i).isDigit&&domain(i-1).isLetter){
switchfrequency+=1
}
if (domain(i-1).isDigit&&domain(i).isLetter){
switchfrequency+=1
}
}
val feature1:Double=1.0*counter('.')/urllen
val feature2:Double=1.0*(counter('A')+counter('E')+counter('I')+counter('O')+counter('U'))/urllen
val feature3:Double=1.0*urlchartset.toArray.length/urllen
val feature4:Double=1.0*(counter('0')+counter('1')+counter('2')+counter('3')+counter('4')+counter('5')+counter('6')+counter('7')+counter('8')+counter('9'))/urllen
val feature5:Double=(calNbcProb(domain)-DNSConfig.probmean)/DNSConfig.probstd
val feature6:Double=(urllen-DNSConfig.lenmean)/DNSConfig.lenstd
val feature7:Double=1.0*maxnumdigit/DNSConfig.max_num
val feature8:Double=1.0*switchfrequency/DNSConfig.SwitchFreq
return Vectors.dense(Array(feature1,feature2,feature3,feature4,feature5,feature6,feature7,feature8))
}
}
归一化,就是把数据的范围放缩至一个相同的范围以避免数据跨度或量级的巨大差别而导致训练上的困难或者结果上的偏差。
3.1.4 加载预训练 SVM 模型
val model = PipelineModel.load(hdfsUri+"/spark_dw_deep_detection/svm/Data/model")
3.1.5 对实时流应用预训练模型
val model_result:DataFrame = model.transform(parsedData)
3.1.6 输出预测结果
val query = model_result
.writeStream
.outputMode(OutputMode.Append()) //流式输出模式
.trigger(Trigger.ProcessingTime("20 seconds"))
.format("console")
.start()
query.awaitTermination() //持续运行
//关闭 Spark
spark.stop()
3.2 SVM 模型训练
3.2.1 配置 Spark
val spark = SparkSession.builder.appName("DNSModelTranning").master("local").getOrCreate()
3.2.2 读取训练集数据
import spark.implicits._
val train_path = "src\resource\Data\svmtrainset.txt"
val train_df: Dataset[(String, Int)] = spark.read.text(train_path)
.sample(false, 0.001) //抽取 0.1% 的数据
.as[String]
.map(x => x.split(" ", -1)) //按指标符分割
.map(x => trainDataFit(x)) //数据适配处理
适配函数:
def trainDataFit(x: Array[String]): (String,Int) = {
try {
val lineSplit = x
var domain = lineSplit(0).toUpperCase.replaceAll(" ", "")
var label = lineSplit(1).toInt
//去掉域名的"WWW."前缀
if(domain.length > 4){
//println(domain)
if(domain.substring(0, 4).equals("WWW.")) {
domain = domain.substring(4, domain.length)
//println(domain)
}
}
return (domain,label)
} catch {
case ex: ArrayIndexOutOfBoundsException =>
return null
}
}
数据量为 34w+,数据格式如下:

3.2.3 提取特征
val parsedData = train_df.map(x=>{
(x._1,x._2,ExtractFeature.extractFeature(x._1))
}).withColumnRenamed("_1","domain").withColumnRenamed("_2","label").withColumnRenamed("_3","rawFeature")
parsedData.show(false)
//切分输入数据为测试数据和验证数据
val Array(trainingData, testData) = parsedData.randomSplit(Array(0.8,0.2))
3.2.4 特征预处理管道
3.2.4.1 标签索引化
将字符串标签转换为数值索引
//把文字标签转为index
val labelIndexer = new StringIndexer()
.setInputCol("label")//指定输入列名
.setOutputCol("indexedLabel")//指定输出列名
.fit(trainingData) //拟合
使用训练数据来拟合一个 StringIndexerModel,它会根据标签的出现频率进行排序,出现最多的标签索引为 0。拟合后,我们可以通过 labelIndexer.labels 获取标签的原始字符串数组,按频率排序。
3.2.4.2 特征索引化
对特征向量中的分类特征进行索引化,它会自动判断哪些特征是离散的(类别特征),并将这些离散特征转换为索引表示。连续特征则保持不变。
val featureIndexer = new VectorIndexer()
.setInputCol("rawFeature")//指定输入特征向量列
.setOutputCol("indexedFeature")//指定输出列名
.setMaxCategories(3)
.fit(trainingData)
setMaxCategories(3):设置一个阈值,当特征的不同取值数量小于等于这个阈值时,该特征会被认为是分类特征,并进行索引化。这里设置为 3,意思是如果某个特征的不同取值数量<=3,则被视为分类特征。fit(trainingData):使用训练数据来拟合一个 VectorIndexerModel,它会决定哪些特征需要被索引化,并记录每个分类特征的映射关系。
3.2.5 SVM 模型配置
val svm = new LinearSVC()
.setMaxIter(1000) //最大迭代次数
.setRegParam(0.1) //正则化参数 默认0.0
.setFeaturesCol(featureIndexer.getOutputCol)//设置特征列输出列名
.setLabelCol(labelIndexer.getOutputCol)//设置标签列输出列名
.setPredictionCol("prediction")//设置预测结果列名
setMaxIter(1000):优化算法最大迭代次数,确保收敛
setRegParam(0.1):正则化参数,控制模型复杂度
相当于 1/C,C 是惩罚系数
值越大正则化越强,防止过拟合
LinearSVC 是 Spark MLlib 中实现的先行支持向量分类器,专门用于二分类问题。它是 SVM 的一种高效线性版本,特别适合处理大规模数据集。
LinearSVC 与 标准 SVM 的区别:
| 特性 | LinearSVC | 标准 SVM |
| 核函数 | 只支持线性核 | 支持多种核函数(RBF、多项式等) |
| 适用场景 | 大规模线性分类 | 中小规模非线性分类 |
| 计算效率 | 更高,适合大数据 | 相对较慢 |
| Spark 支持 | 原生支持 | 需要通过其他库 |
3.2.6 标签转化器
将数值预测结果转换为原始标签字符串,便于理解
//将index标签转为字符
val labelConverter = new IndexToString()
.setLabels(labelIndexer.labels) // 设置原始标签
.setInputCol(svm.getPredictionCol) // 输入预测列
.setOutputCol("PredictedLabel") // 输出可读标签列
3.2.7 管道组装和训练
val pipeline = new Pipeline().setStages(Array(labelIndexer,featureIndexer,svm,labelConverter))
//训练集80%
val model = pipeline.fit(trainingData)
//训练
val predictionResultDF = model.transform(testData)
predictionResultDF.show(false)
3.2.8 模型评估
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(labelIndexer.getOutputCol)
.setPredictionCol(svm.getPredictionCol)
//.setMetricName("precision")
//评估结果
val predictionAccuracy = evaluator.evaluate(predictionResultDF)
println("Testing Accuracy is %2.4f".format(predictionAccuracy * 100) + "%")
:多分类评估器
MulticlassClassificationEvaluator
默认使用 f1-score 作为评估指标
也可以设置其他指标:,
precision,
recall
accuracy
3.2.9 模型及管道保存
pipeline.write.overwrite().save("src\resource\Data\pipeline")
model.write.overwrite().save("src\resource\Data\model")
//关闭 Spark
spark.stop
3.2.10 完整数据处理流程
// 原始数据流程:
训练数据 → 数据清洗 → 特征提取 → 标签索引化 → 特征索引化 → SVM训练 → 模型评估
// 预测时流程:
新域名 → 特征提取 → 标签索引化 → 特征索引化 → SVM预测 → 标签转换 → 可读结果