现在,Spark提供了可在数据帧中使用的预定义功能,而且似乎已对其进行了高度优化。我最初的问题是哪个更快,但是我自己做了一些测试,发现spark函数至少在一次实例中快了10倍。有谁知道为什么会这样,以及什么时候udf会更快(仅适用于存在相同的spark函数的实例)?

这是我的测试代码(在Databricks社区版上运行):

 # UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]

dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
 


UDF函数:

 concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
 


Spark功能:

 spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
 


都运行了两次,udf通常花费约1.1-1.4 s,Spark concat函数始终花费0.15 s以下。

#1 楼


什么时候udf会更快?


如果您询问Python UDF,答案可能永远不会*。由于SQL函数相对简单,并且不是为复杂任务设计的,因此几乎不可能补偿Python解释器与JVM之间重复序列化,反序列化和数据移动的成本。


有谁知道为什么会这样


上面已经列举了主要原因,可以将其简化为一个简单的事实,即Spark DataFrame本质上是一个JVM结构,并且通过对Java的简单调用实现了标准访问方法API。另一方面,UDF是用Python实现的,并且需要来回移动数据。

虽然PySpark通常要求在JVM和Python之间进行数据移动,但是在低级RDD API的情况下,通常不需要昂贵的Serde活动。 Spark SQL增加了序列化和序列化的额外成本,以及在JVM上从不安全的表示移入和移出数据的成本。后一种适用于所有UDF(Python,Scala和Java),而前一种适用于非本机语言。

与UDF不同,Spark SQL函数直接在JVM上运行,并且通常集成良好与催化剂和钨。这意味着可以在执行计划中对其进行优化,并且大多数时候可以受益于codgen和其他钨优化。此外,它们可以以“本机”表示形式对数据进行操作。

因此,从某种意义上讲,这里的问题是Python UDF必须将数据带入代码中,而SQL表达式则相反。 />

*根据粗略估计,PySpark窗口UDF可以击败Scala窗口功能。

评论


很棒的答案,正是我想要的。我不确定这是由于Python-Java之间的数据混排造成的,只是不确定。我感谢这些信息还可以从Catalyst和Tungsten中受益,因此,对于我来说,在代码中尽可能多地实现它们并最小化UDF,对我来说将变得更加重要。话题有点偏离,但是您是否会很快知道numpy功能是否会在Spark Dataframes中使用?这使我的项目之一很大程度上依赖于RDD。

– alfredox
16年7月11日,0:24



我不确定“ numpy功能”到底是什么意思。

– zero323
16年7月11日在2:54

您不能将numpy数组添加为行元素。目前,Spark Rows支持不同的数据类型,例如StringType,BoolType,FloatType,但是您不能在其中保存numpy数组。

– alfredox
16年7月12日在3:29

如果您的意思是功能性的numpy对象,那么绝对不可以。如果您的意思是可以用于存储和检索的列类型,那么VectorUDT几乎就是这个

– zero323
16年7月12日在10:45

“几乎不可能补偿重复序列化,反序列化的成本”。这些天来有PyArrow解决了这个问题。

–马库斯·林德(Marcus Lind)
19年11月27日在13:46

#2 楼

多年之后,当我有了更多火花知识并重新审视了这个问题时,才意识到@alfredox真正想问什么。因此,我再次进行了修订,并将答案分为两个部分:


要回答为何本机DF函数(本机Spark-SQL函数)更快:

,为什么原生Spark函数总是比Spark UDF快,无论您的UDF是用Python还是Scala实现的。

首先,我们需要了解什么是Tungsten,它是Spark 1.4中首次引入的。

它是一个后端,它关注的重点是:



利用二进制内存数据表示(又称为钨行格式)和管理内存的堆外内存管理
明确表示
缓存局部性,它与具有高速缓存命中率的高速缓存感知布局有关的高速缓存感知计算,
全阶段代码生成(aka CodeGen)。


/>
最大的Spark性能杀手之一是GC。 GC将暂停JVM中的每个线程,直到GC完成。这就是为什么引入了堆外内存管理的原因。
执行Spark-SQL本机函数时,数据将保留在钨后端。但是,在Spark UDF场景中,数据将从钨中移出到JVM(Scala场景)或JVM和Python进程(Python)中进行实际处理,然后再移回钨中。结果是:


不可避免地会产生开销:/


反序列化钨的输入。
将输出序列化回钨。


即使使用Scala,Spark是一流的公民,它也会增加JVM中的内存占用,并且可能涉及更多JVM中的GC。
这个问题正是钨的“堆外内存管理”功能试图解决的问题。


回答Python是否一定会比Scala慢:

自2017年10月30日起,Spark刚刚为pyspark引入了矢量化udfs。

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Python UDF速度缓慢的原因可能是PySpark UDF的实现方式不是最优化的:

根据链接中的段落。


Spark在版本0.7中添加了Python API,并提供了支持用于用户定义的功能。这些用户定义的函数一次只能运行一行,因此遭受很高的序列化和调用开销。


但是,新矢量化的udfs似乎可以提高性能。很多:


从3倍到100倍以上。




#3 楼

在恢复使用您自己的自定义UDF函数之前,请尽可能将更高级别的基于列的标准函数与Dataset运算符一起使用,因为UDF是Spark的BlackBox,因此它甚至不尝试对其进行优化。

屏幕后面实际发生的是,Catalyst根本无法处理和优化UDF,并将它们威胁为BlackBox,从而导致失去诸如谓词下推,常量折叠等许多优化。