spark利器2函数之dataframe大局排序id与分组后保存最大值行


作为一个算法工程师,日常学习和工作中,不光要 练习模型重视作用 ,更多的 时刻 是在 准备样本数据与剖析数据 等,而这些进程 都与 大数据 spark和hadoop生态 的若干东西休戚相关。

今日咱们就不在更新 机器学习算法模型 相关的内容,分享两个 spark函数 吧,以前也在某种场景中使用过但没有保存保藏,哎!! 事前不搜藏,临时抱佛脚 的感觉 真是 痛苦,太耽搁干活了

so,把这 两个函数 记在这儿 以备不时 之需~


(1) 得到 spark dataframe 大局排序ID

这个函数的 应用场景 便是:依据某一列的数值对 spark 的 dataframe 进行排序, 得到大局多分区排序的大局有序ID,新增一列保存这个rank id ,并且保存其他列的数据无变化

有用户会说,这不是很容易吗 ,直接用 orderBy 不就能够了吗,可是难点是:orderBy完记载下大局ID 并且 保持原来全部列的DF数据

多说无益,遇到这个场景 直接copy 用起来 就知道 有多爽 了,同类问题 咱们能够 用下面 这个函数 处理 ~

scala 写的 spark 版别代码:


@欢迎重视微信大众号:算法全栈之路
defdfZipWithIndex(
df:DataFrame,
offset:Int=1,
colName:String="rank_id",
inFront:Boolean=true
):DataFrame={
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln=>
Row.fromSeq(
(if(inFront)Seq(ln._2+offset)elseSeq())
++ln._1.toSeq++
(if(inFront)Seq()elseSeq(ln._2+offset))
)
),
StructType(
(if(inFront)Array(StructField(colName,LongType,false))elseArray[StructField]())
++df.schema.fields++
(if(inFront)Array[StructField]()elseArray(StructField(colName,LongType,false)))
)
)
}

函数调用咱们能够用这行代码调用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接复制过去就能够~

python写的 pyspark 版别代码:

@欢迎重视微信大众号:算法全栈之路
frompyspark.sql.typesimportLongType,StructField,StructType
defdfZipWithIndex(df,offset=1,colName="rank_id"):
new_schema=StructType(
[StructField(colName,LongType(),True)]#newaddedfieldinfront
+df.schema.fields#previousschema
)
zipped_rdd=df.rdd.zipWithIndex()
new_rdd=zipped_rdd.map(lambda(row,rowId):([rowId+offset]+list(row)))
returnspark.createDataFrame(new_rdd,new_schema)

调用 同理 , 这儿我就不在进行赘述了。


(2)分组后保存最大值行

这个函数的 应用场景 便是: 当咱们使用 spark 或则 sparkSQL 查找某个 dataframe 数据的时候,在某一天里,恣意一个用户可能有多条记载,咱们需求 对每一个用户,保存dataframe 中 某列值最大 的那行数据

其中的 关键点 在于:一次性求出对每个用户分组后,求得每个用户的多行记载中,某个值最大的行进行数据保存

当然,经过 简略修正代码,纷歧定是最大,最小也是能够的,平均都ok

scala 写的 spark 版别代码:


@欢迎重视微信大众号:算法全栈之路
//得到一天内一个用户多个记载里面时刻最大的那行用户的记载
importorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.functions
valw=Window.partitionBy("user_id")
valresult_df=raw_df
.withColumn("max_time",functions.max("time").over(w))
.where($"time"===$"max_time")
.drop($"max_time")

python写的 pyspark 版别代码:

@欢迎重视微信大众号:算法全栈之路
#pysparkdataframe某列值最大的元素所在的那一行
#GroupBy列并过滤Pyspark中某列值最大的行
#创立一个Window以按A列进行分区,并使用它来核算每个组的最大值。然后过滤出行,使B列中的值等于最大值
frompyspark.sqlimportWindow
w=Window.partitionBy('user_id')
result_df=spark.sql(raw_df).withColumn('max_time',fun.max('time').over(w))\
.where(fun.col('time')==fun.col('time'))
.drop('max_time')

咱们能够看到: 这个函数的关键便是运用了 spark 的 window 函数 ,灵活运用 威力无量 哦 !

到这儿,spark利器2函数之dataframe大局排序id与分组后保存最大值行 的全文 就写完了 。

看到不搜藏,用时忙断魂哦!! 赶紧保藏转发吧~


码字不易,觉得有收获就动动小手转载一下吧,你的支撑是我写下去的最大动力 ~

更多更全更新内容,欢迎重视作者的大众号: 算法全栈之路

spark利器2函数之dataframe全局排序id与分组后保留最大值行

  • END –