Scala005-DataFrame中使用UDF

news/2024/7/2 5:31:49

在处理spark.DataFrame时,经常会用到udf,简单做些总结和笔记。

构造数据

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...



Spark Web UI available at 11111111111:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1598929668275)
SparkSession available as 'spark'






import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession
      .builder()
      .appName("learningScala")
      .config("spark.executor.heartbeatInterval","60s")
      .config("spark.network.timeout","120s")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryoserializer.buffer.max","512m")
      .config("spark.dynamicAllocation.enabled", false)
      .config("spark.sql.inMemoryColumnarStorage.compressed", true)
      .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
      .config("spark.sql.broadcastTimeout", 600)
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .config("spark.sql.crossJoin.enabled", true)
      .master("local[*]") 
val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@64837d8
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@542c0943
var df = Seq(
      ("A", 1, 4,7),
      ("B", 2, 5,8),
      ("C", 3 ,6,9)).toDF("id", "x", "y","z")
df.show(truncate=false)
+---+---+---+---+
|id |x  |y  |z  |
+---+---+---+---+
|A  |1  |4  |7  |
|B  |2  |5  |8  |
|C  |3  |6  |9  |
+---+---+---+---+






df: org.apache.spark.sql.DataFrame = [id: string, x: int ... 2 more fields]
df.printSchema()
root
 |-- id: string (nullable = true)
 |-- x: integer (nullable = false)
 |-- y: integer (nullable = false)
 |-- z: integer (nullable = false)

方法一

该方法对外部可见,可以直接在DataFrame中使用,但是不可以在spark.sql中使用

def add_one(useCol1:Int,useCol2:Int)={
    useCol1+useCol2
}
add_one: (useCol1: Int, useCol2: Int)Int
import org.apache.spark.sql.functions.{udf,col}
val add_one_udf = udf(add_one(_:Int,_:Int))
import org.apache.spark.sql.functions.{udf, col}
add_one_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(IntegerType, IntegerType)))
df.withColumn("sum",add_one_udf(col("y"),col("z"))).show(truncate=false)
+---+---+---+---+---+
|id |x  |y  |z  |sum|
+---+---+---+---+---+
|A  |1  |4  |7  |11 |
|B  |2  |5  |8  |13 |
|C  |3  |6  |9  |15 |
+---+---+---+---+---+

方法二

该方法本来应该是在spark.sql中使用的,但是也可以通过callUDF的方式在DataFrame中使用

spark.udf.register("add_one_udf2", add_one _)
res16: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(IntegerType, IntegerType)))
import org.apache.spark.sql.functions
df.withColumn("sum", functions.callUDF("add_one_udf2", col("y"),col("z"))).show(truncate=false)
+---+---+---+---+---+
|id |x  |y  |z  |sum|
+---+---+---+---+---+
|A  |1  |4  |7  |11 |
|B  |2  |5  |8  |13 |
|C  |3  |6  |9  |15 |
+---+---+---+---+---+

import org.apache.spark.sql.functions
df.createOrReplaceTempView("df")
spark.sql("select *,add_one_udf2(y,z) AS sum  from df").show()
+---+---+---+---+---+
| id|  x|  y|  z|sum|
+---+---+---+---+---+
|  A|  1|  4|  7| 11|
|  B|  2|  5|  8| 13|
|  C|  3|  6|  9| 15|
+---+---+---+---+---+

                                2020-09-01 狂风骤雨于南京市江宁区九龙湖


http://www.niftyadmin.cn/n/972030.html

相关文章

关闭端口

每一项服务都对应相应的端口&#xff0c;比如众如周知的WWW服务的端口是80&#xff0c;smtp是25&#xff0c;ftp是21&#xff0c;win2000安装中默认的都是这些服务开启的。对于个人用户来说确实没有必要&#xff0c;关掉端口也就是关闭无用的服务。 “控制面板”的“管理工具”…

PyPackage02---Numpy01_flatten用法

Intro 对于numpy中的多维数组&#xff0c;需要将其转换成1维。此时可以用flatten方法。 相关环境和package信息&#xff1a; import sys import pandas as pd import numpy as np print("Python版本&#xff1a;",sys.version) print("numpy版本&#xff1a;&…

一个悲伤的故事

昨天她和我说她和她男友分了&#xff0c;我说不挺好的麽&#xff0c;你都准备回武汉了&#xff0c;她说他家为了多分一套房子逼婚&#xff0c;仅仅只是想让两个人拿证&#xff0c;没谈操办婚事之类的事情&#xff0c;也没过问她家的意见&#xff0c;结果就是两人分了&#xff0…

全局修改数据库字段类型

createPROCp_typeTotypetypetinyint0, --修改方式&#xff0c;0仅查询可修改情况,1仅所有列可修改时才修改,2修改可修改列&#xff0c;报告不可修改列typefromnvarchar(50),typetonvarchar(50)ASSETNOCOUNT ON--查询非unicode列转换为unicode列的可行性SELECTTableNameo.name,…

PythonNote025---conda创建python虚拟环境

Intro 有些情况下&#xff0c;需要用到特殊版本的package&#xff0c;我们可以通过创建虚拟环境的方式定制一个版本供某一个项目使用&#xff0c;保证环境的相对独立性。另一方面&#xff0c;如果我们在集群上执行python任务&#xff0c;虚拟环境的方式可以避免在所有节点都安装…

PysparkNote101---DataFrame行转列

sql里经常会遇到行转列or列转行&#xff0c;如果数据框为{“A”,[1,2])}&#xff0c;需要行转列为{(“A”,1),(“B”,2)}。话不多说&#xff0c;直接看代码。 import pyspark.sql.functions as F from pyspark.sql import SparkSession # 创建SparkSession对象&#xff0c;调用…

Docker(一):入门教程

2013年发布至今&#xff0c; Docker 一直广受瞩目&#xff0c;被认为可能会改变软件行业。 但是&#xff0c;许多人并不清楚 Docker 到底是什么&#xff0c;要解决什么问题&#xff0c;好处又在哪里&#xff1f;本文就来详细解释&#xff0c;帮助大家理解它&#xff0c;还带有简…

关于某些流负载技术

loadbandwith可以单独使用&#xff0c;用来静态指定接口的带宽负载&#xff0c;按照指定的比例进行等值ECMP路径分流RCR本地模式&#xff0c;支持动态调整&#xff0c;但是必须要依靠netstream的检查&#xff0c;周期性进行的&#xff0c;outbound方向的流&#xff0c;以及必须…