当函数在对象中时一切正常
当函数在类中时get:
不可序列化的任务:java.io.NotSerializableException:测试
问题是我需要我的代码在类中而不是对象中。知道为什么会这样吗? Scala对象是否已序列化(默认值?)?这是一个有效的代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
这是不起作用的示例:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
#1 楼
RDD扩展了Serialisable接口,所以这不是导致您的任务失败的原因。现在,这并不意味着您可以使用Spark序列化RDD
并避免使用NotSerializableException
Spark是分布式计算引擎,其主要抽象是弹性分布式数据集(RDD),可以将其视为分布式集合。基本上,RDD的元素在群集的各个节点之间进行分区,但是Spark将其从用户中抽象出来,使用户可以像本地对象一样与RDD(集合)进行交互。
涉及太多细节,但是当您在RDD上运行不同的转换(
map
,flatMap
,filter
等)时,转换代码(关闭)为:在驱动程序节点上序列化,
传送到群集中的相应节点,
反序列化,
并最终在节点上执行
您当然可以在本地运行(如您的示例) ),但所有这些阶段(除通过网络运输外)仍会发生。 [这甚至使您可以在部署到生产之前捕获任何错误]
第二种情况是您正在从map函数内部调用在
testing
类中定义的方法。 Spark看到了这种情况,并且由于无法单独对方法进行序列化,因此Spark尝试对整个testing
类进行序列化,以便在另一个JVM中执行该代码后仍能正常工作。您有两种可能:要么使类测试可序列化,那么整个类都可以由Spark序列化:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
,或者您使
someFunc
函数而不是方法(函数是Scala中的对象),以便Spark可以对其进行序列化:import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
类似,但类的问题不同序列化可能会让您感兴趣,您可以在此Spark Summit 2013演示文稿中进行阅读。
作为附带说明,您可以将
rddList.map(someFunc(_))
重写为rddList.map(someFunc)
,它们是完全相同的。通常,第二个是首选,因为它不那么冗长且更简洁。编辑(2015-03-15):SPARK-5307引入了SerializationDebugger,而Spark 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到NotSerializableException时,调试器将访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象。
在OP中,这就是被打印到标准输出:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun, name: $outer, type: class testing)
- object (class testing$$anonfun, <function1>)
评论
嗯,您所解释的内容当然是有道理的,并解释了为什么整个类都被序列化(这是我没有完全理解的)。不过,我仍然认为rdd不能序列化(它们扩展了Serializable,但这并不意味着它们不会引起NotSerializableException,请尝试一下)。这就是为什么如果将它们放在类之外,则可以修复错误。我将对我的答案进行一些编辑以更精确地理解我的意思-即它们导致异常,而不是它们扩展了接口。
–最好的
2014年3月24日18:51
如果您无法控制该类,则需要可序列化...如果您使用的是Scala,则可以使用Serializable实例化它:val test = new Test with Serializable
– Mark S
2014年8月11日在17:17
“ rddList.map(someFunc(_))与rddList.map(someFunc)完全相同”不,它们并不完全相同,并且实际上使用后者会导致序列化异常,而前者则不会。
–最好的
16-2-23在15:50
@samthebest您能解释一下为什么map(someFunc(_))不会导致序列化异常,而map(someFunc)会导致序列化异常吗?
–阿隆
19/12/15在20:41
#2 楼
Grega的答案很好地解释了原始代码为什么不起作用以及解决该问题的两种方法。但是,这种解决方案不是很灵活。请考虑您的闭包包括您无法控制的非Serializable
类上的方法调用的情况。您既不能将Serializable
标记添加到此类,也不能更改基础实现以将方法更改为函数。 Nilesh为此提供了一个很好的解决方法,但是该解决方案可以变得更加简洁和通用:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
此功能序列化器可以然后用于自动包装闭包和方法调用:
rdd map genMapper(someFunc)
此技术的好处还在于,不需要附加的Shark依赖关系即可访问
KryoSerializationWrapper
,因为Twitter的Chill已由核心Spark 引入
评论
嗨,我想知道我是否需要使用您的代码进行注册?我试着从kryo获取了Unable find class异常。谢谢
– G_cy
16年11月11日在7:50
#3 楼
完整的演讲完全解释了这个问题,提出了一种避免这些序列化问题的绝佳范式转换方法:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-泄漏最多的ws.md投票最多的答案基本上是建议放弃整个语言功能-不再使用方法,而仅使用函数。实际上,在函数式编程中应该避免使用类中的方法,但是将它们转换为函数并不能解决这里的设计问题(请参见上面的链接)。
作为在这种特殊情况下的快速修复,您可以使用
@transient
批注告诉它不要尝试序列化有问题的值(此处,Spark.ctx
是一个自定义类,不是Spark的以下OP命名): @transient
val rddList = Spark.ctx.parallelize(list)
还可以重组代码,使rddList驻留在其他位置,但这也很讨厌。
未来可能是孢子
/>
将来,Scala将包括称为“孢子”的东西,这应该使我们能够精细地控制封闭产生的作用和不产生的作用。此外,这应该将所有误将不可序列化的类型(或任何不需要的值)拉入编译错误的错误,而不是现在的错误,这是可怕的运行时异常/内存泄漏。
http://docs.scala -lang.org/sips/pending/spores.html
有关Kryo序列化的提示
使用kyro时,使其必须注册,这意味着您会错误,而不是内存泄漏:
”最后,我知道kryo具有kryo.setRegistrationOptional(true),但是我很难弄清楚如何使用它。当打开此选项时继续,如果我还没有注册类,kryo似乎仍然会抛出异常。“
使用kryo注册类的策略
当然,这只能为您提供类型级别的控制,值级控制。
...还有更多想法。
#4 楼
我用另一种方法解决了这个问题。您只需要在通过闭包之前对对象进行序列化,然后再进行反序列化即可。即使您的课程不是可序列化的,这种方法也行得通,因为它在后台使用了Kryo。您所需要的只是一些咖喱。 ;)这是我的工作方式示例:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
随意使Blah变得像您想要的那样复杂,类,同伴对象,嵌套类,对多个第三方库的引用。
KryoSerializationWrapper引用:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/序列化/KryoSerializationWrapper.scala
评论
这实际上是序列化实例还是创建静态实例并序列化引用(请参阅我的答案)。
–最好的
2014年7月2日,12:25
@samthebest您能详细说说吗?如果您调查KryoSerializationWrapper,您会发现它使Spark认为它确实是java.io.Serializable-它仅使用Kryo在内部对对象进行序列化-更快,更简单。而且我不认为它处理静态实例-只是在调用value.apply()时将值反序列化。
– Nilesh
2014年7月2日在18:07
#5 楼
我遇到了类似的问题,我从Grega的答案中了解到的是object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
您的doIT方法正在尝试序列化someFunc(_)方法,但是由于该方法不可序列化,它会尝试对无法再次进行序列化的类测试进行序列化。
因此,使您的代码正常工作,您应该在doIT方法内定义someFunc。例如:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
如果出现了多个功能,则所有这些功能都应可用于父级上下文。
#6 楼
我不确定这是否适用于Scala,但是在Java中,我通过重构代码解决了NotSerializableException
,以便闭包不会访问不可序列化的final
字段。评论
我在Java中遇到了同样的问题,我试图在RDD foreach方法中使用Java IO包中的FileWriter类。您能否让我知道我们如何解决这个问题。
– Shankar
15年7月23日在8:46
@Shankar,如果FileWriter是外部类的最后一个字段,则不能这样做。但是FileWriter可以由可串行化的字符串或文件构造。因此,根据外部类的文件名,重构代码以构造本地FileWriter。
–Trebor粗鲁
15年7月23日在16:10
#7 楼
仅供参考,Spark 2.4中的许多人可能会遇到此问题。 Kryo序列化已经变得更好,但是在许多情况下,您不能使用spark.kryo.unsafe = true或朴素的kryo序列化器。要快速修复,请尝试在Spark配置中更改以下内容
spark.kryo.unsafe="false"
OR
spark.serializer="org.apache.spark.serializer.JavaSerializer"
我通过使用显式广播变量并利用新的内置twitter-chill api修改遇到或亲自编写的自定义RDD转换,将它们从
rdd.map(row =>
转换为rdd.mapPartitions(partition => {
函数。示例
旧的(不是很好的)方式
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
替代(更好) )方法
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
此新方法将仅对每个分区调用一次广播变量,即更好。如果不注册类,您仍然需要使用Java序列化。
#8 楼
def upper(name: String) : String = {
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""
val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)
会出现错误
org.apache.spark.SparkException:任务无法序列化
在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)
解决方案-
import java.io.Serializable;
object obj_upper extends Serializable {
def upper(name: String) : String =
{
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}
val df_upperName=
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)
#9 楼
我也有类似的经历。在驱动程序(主)上初始化变量时触发了错误,但随后尝试在其中一个工作程序上使用它。
发生这种情况时,Spark Streaming将尝试将该对象序列化以将其发送给工作程序,如果该对象不可序列化,则会失败。
我通过将变量设为静态解决了该错误。
先前的无效代码
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
工作代码
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
信用:
https://docs.microsoft.com/en-us/answers/questions/35812/sparkexception- job-aborted-due-to-stage-failure-ta.html(pradeepcheekatla-msft的答案)
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception .html
评论
什么是Spark.ctx?没有使用ctx AFAICT方法的Spark对象