我的编程空间,编程开发者的网络收藏夹
学习永远不晚

sparkdataframe全局排序id与分组后保留最大值行

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

sparkdataframe全局排序id与分组后保留最大值行

正文

作为一个算法工程师,日常学习和工作中,不光要 训练模型关注效果 ,更多的 时间 是在 准备样本数据与分析数据 等,而这些过程 都与 大数据 spark和hadoop生态 的若干工具息息相关。

今天我们就不在更新 机器学习算法模型 相关的内容,分享两个 spark函数 吧,以前也在某种场景中使用过但没有保存收藏,哎!! 事前不搜藏,临时抱佛脚 的感觉 真是 痛苦,太耽误干活了

so,把这 两个函数 记在这里 以备不时 之需~

(1) 得到 spark dataframe 全局排序ID

这个函数的 应用场景 就是:根据某一列的数值对 spark 的 dataframe 进行排序, 得到全局多分区排序的全局有序ID,新增一列保存这个rank id ,并且保留别的列的数据无变化

有用户会说,这不是很容易吗 ,直接用 orderBy 不就可以了吗,但是难点是:orderBy完记录下全局ID 并且 保持原来全部列的DF数据

多说无益,遇到这个场景 直接copy 用起来 就知道 有多爽 了,同类问题 我们可以 用下面 这个函数 解决 ~

scala 写的 spark 版本代码:

def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String ="rank_id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
        ++ df.schema.fields ++
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  )
}

函数调用我们可以用这行代码调用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接复制过去就可以~

python写的 pyspark 版本代码:

from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rank_id"):
    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )
    zipped_rdd = df.rdd.zipWithIndex()
    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
    return spark.createDataFrame(new_rdd, new_schema)

调用 同理 , 这里我就不在进行赘述了。

(2)分组后保留最大值行

这个函数的 应用场景 就是: 当我们使用 spark 或则 sparkSQL 查找某个 dataframe 数据的时候,在某一天里,任意一个用户可能有多条记录,我们需要 对每一个用户,保留dataframe 中 某列值最大 的那行数据

其中的 关键点 在于:一次性求出对每个用户分组后,求得每个用户的多行记录中,某个值最大的行进行数据保留

当然,经过 简单修改代码,不一定是最大,最小也是可以的,平均都ok

scala 写的 spark 版本代码:

// 得到一天内一个用户多个记录里面时间最大的那行用户的记录
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
val w = Window.partitionBy("user_id")
val result_df = raw_df
    .withColumn("max_time",functions.max("time").over(w))
    .where($"time" === $"max_time")
    .drop($"max_time")

python写的 pyspark 版本代码:

# pyspark dataframe 某列值最大的元素所在的那一行 
# GroupBy 列并过滤 Pyspark 中某列值最大的行 
# 创建一个Window 以按A列进行分区,并使用它来计算每个组的最大值。然后过滤出行,使 B 列中的值等于最大值 
from pyspark.sql import Window
w = Window.partitionBy('user_id')
result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\
    .where(fun.col('time') == fun.col('time'))
    .drop('max_time')

我们可以看到: 这个函数的关键就是运用了 spark 的 window 函数 ,灵活运用 威力无穷 哦 !

到这里,spark利器2函数之dataframe全局排序id与分组后保留最大值行 的全文 就写完了 ,更多关于spark dataframe全局排序的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

sparkdataframe全局排序id与分组后保留最大值行

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

sparkdataframe全局排序id与分组后保留最大值行

这篇文章主要为大家介绍了sparkdataframe全局排序id与分组后保留最大值行实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-02-09

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录