如何在单个Job中使用Spark写入取决于键的多个输出。

相关内容:通过Scalding Hadoop,一个MapReduce Job的键对多个输出进行写

Eg

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)


将确保cat prefix/1

a
b


,而cat prefix/2将为

c


编辑:我最近添加了一个新的答案,其中包括完整的导入,pimp和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965,这可能对您有所帮助较早的答案。

评论

您能否添加一个示例来说明您的追求?也许是输入集合以及您期望的过程/输出是什么?

不,已完成maasg的请求@fengyun,“完成”。我们有一个用例,特别是当前,我们有一个使用TemplatedTsv的Scalding作业,我们希望将其替换为Spark作业。但是重构一直在我们的待办事项中……当我最终写出一个完整的基于hdfs的解决方案时,我一定会在这里发布它。

这个例子有帮助吗?我正在尝试自己弄清楚该怎么做。

仅供参考:我已打开SPARK-3533,要求将更简单的方法添加到Spark中。

#1 楼

如果使用Spark 1.4+,这要归功于DataFrame API,它变得非常容易。 (DataFrame是在Spark 1.3中引入的,而我们需要的partitionBy()是在1.4中引入的。)

如果您是从RDD开始的,则首先需要将其转换为DataFrame :

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")


在Python中,相同的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])


一旦有了DataFrame,就编写基于特定键将多个输出转换为简单的方法。更重要的是-这就是DataFrame API的优点-在Python,Scala,Java和R中,代码几乎相同:

people_df.write.partitionBy("number").text("people")


如果需要,可以轻松使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")


在每个示例中,Spark都会为分区了DataFrame的每个键创建一个子目录上:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh


评论


您可以在Scala中添加等效的数据集代码吗?我会接受为最佳答案。是的,有些人不在乎类型,喜欢每隔几分钟运行整个应用程序以查找是否有错误,但是我们中的一些人喜欢在键入时立即捕获诸如“ nubmer”之类的拼写错误:)回答。

–最好的
16年5月12日在17:28

@samthebest-仅供参考,我回滚了您的编辑,因为它存在一些问题:它不符合我的写作风格;我对数据集了解不多,因此有关数据集[SomeCaseClass]的注释更适合作为注释;最后,Python没有makeRDD()方法。

–尼克·查马斯(Nick Chammas)
16年5月14日在16:23

请注意,如果您有Dataset [SomeCaseClass],则只需调用.toDF(),列标签将与SomeCaseClasses字段匹配。这提供了更多的类型安全性。

–最好的
16年5月18日在14:10

有什么方法可以强制这种方法每个分区只写入一个文件/部分?

– moustachio
16年5月21日在18:06

@moustachio-好问题。我认为您可以通过将DataFrame合并到partitionBy()之前的一个分区中来强制执行此操作。例如:people_df.coalesce(1).write.partitionBy(“ number”)。text(“ people”)但是,这可能会限制写入数据时Spark的并行性,具体取决于您的数据和集群配置。

–尼克·查马斯(Nick Chammas)
16年5月23日在1:25

#2 楼

我会这样做,因为它可以伸缩

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}


上面刚刚看到了类似的答案,但实际上我们不需要自定义分区。 MultipleTextOutputFormat将为每个键创建文件。可以将具有相同键的多个记录放入同一分区。

新的HashPartitioner(num),其中num是所需的分区号。如果您有大量不同的键,可以将数字设置为big。在这种情况下,每个分区将不会打开太多的hdfs文件处理程序。

评论


请您添加所有必要的导入语句吗?我没有测试过,但是接受答案似乎是我想要的。 partitionBy(new Hashpartitioner(num))有什么意义?这和repartition(num)不同吗?

–最好的
2014-09-27 12:48

它是不同的。哈希分区将确保所有具有相同键的记录都转到相同分区。我记得重新分区没有此功能。

–张展
2014年9月28日在0:57

非常感谢这个非常好的解决方案。我只是想知道以下内容:我应该如何修改您的代码,以使每个文件的输出按值v排序?

– Yanis Gkoufas
15年1月31日在20:31

我一直在寻找编写多个实木复合地板输出的方法,而沿着这条线的解决方案看起来很有希望(仅直接子类化MultipleOutputFormat,而不使用MultipleTextOutputFormat)。不幸的是,MutlipleOutputFormat仅存在于旧的API MR1 / mapred中,而AvroParquetOutputFormat和ParquetOutputFormat(支持镶木地板)是针对新API MR2 / mapreduce编写的,因此似乎未打开相同的路径...

– silasdavis
2015年7月3日15:45



看起来很棒!有python等效项吗?

– NDavis
16年2月24日,0:03

#3 楼

如果给定密钥可能有很多值,我认为可扩展的解决方案是每个分区的每个密钥写出一个文件。不幸的是,Spark中没有对此的内置支持,但是我们可以解决一下。

(用选择的分布式文件系统操作代替PrintWriter。)

这会在RDD上进行一次传递,并且不会进行随机播放。它为每个键提供一个目录,每个目录中都有许多文件。

评论


谢谢。如果我们在哪里使用HDFS而不是本地文件系统,那么实际上我们将自己亲自实现改组部分,对吗?另外,当多个分区包含具有相同密钥的对时会发生什么?这两个任务都可能尝试写入同一文件,因此我们需要某种同步的文件管理系统来跟踪创建XXXXX部分。鉴于我确定存在使用MultipleOutputFormat的解决方案,因此该解决方案感到很脏。

–最好的
14年6月21日在16:02

没错,这是一种实施改组。但是我认为没有瓶颈。没有单个节点正在接收带有密钥的所有记录。来自多个分区的同一密钥没有问题,也不需要同步。文件名是输出/ <键> / <分区>。因此,每个分区写入不同的文件。 (分区索引转到示例中的后缀。)

–丹尼尔(Daniel Darabos)
14年6月21日在17:33

MultipleOutputFormat听起来很适合这项工作,并且可以按照相同的想法工作。我只是从未使用过。我认为您只是重写MultiWriter以使用MultipleOutputFormat而不是滚动其自己的key-> file映射。但是mapPartitionsWithIndex位将基本保持不变。

–丹尼尔(Daniel Darabos)
14年6月21日在17:36

抱歉,我误解了您的解决方案(我略读了一下)。感谢您的澄清。是的,我认为可以进行一些尝试,并用HDFS替换编写器代码,这将起作用(也没有瓶颈)。感谢您的回答。

–最好的
14年6月22日在12:38

我担心当我们使用mapPartitionsWithIndex并手动写入HDFS时,该特定分区不一定会输出到该分区的所需位置。因此,额外的混洗是不必要的,并且可以避免。

–最好的
2014年8月12日上午10:42

#4 楼

这包括请求的编解码器,必要的导入和请求的pimp。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")


与OP的一个细微差别是,它将<keyName>=前缀到目录名称。例如,

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")


给出:

prefix/key=1/part-00000
prefix/key=2/part-00000


其中prefix/my_number=1/part-00000包含行ab以及prefix/my_number=2/part-00000将包含行c



myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")


将给出:

prefix/foo=1/part-00000
prefix/foo=2/part-00000


应该清楚如何编辑parquet

最后,下面是Dataset的示例,也许比使用元组更好。

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}


评论


不确定它没有+100赞成票,而实际上有零赞成票。非常有帮助,谢谢!

–阿里奥斯塔德
17年12月20日在20:49



@Aliostad,看看日期,这是一年半之后发布的。另外,在SO中发布自己的问题的答案(在已经有一个或多个有效问题之后)并接受它并不是惯例(有时被认为是不礼貌的)。有时情况需要多个答案,但是您通常会保留原始答案(除非事实证明是错误的,或者来自另一个用户的新答案好得多,但事实并非如此,OP明确认为原答案正确)。在这种情况下,我只能假设OP不了解该准则。

–阿贝尔
18/12/15在20:55



@Abel我知道准则,但是我觉得有必要发布自己的答案,因为我的答案“比这里要好得多”,因为这是唯一的答案:1.包括如何指定压缩编解码器(如在OP中要求); 2。包括如何将其添加为pimp / extension方法(在OP中要求); 3。实际编译! (包括必要的导入),4.使用正确的Scala样式和格式。令人遗憾的是,快到2019年了,并不是每个人都可以编写可编译的代码,也不是正确的样式。

–最好的
18/12/17在8:30

最高的答案实际上是最好的,看来您基本上是在模仿他。

– JP Silvashy
19年11月17日在17:33

@JPSilvashy我确实尝试编辑答案,以便它1.包括如何指定压缩编解码器(按OP的要求),2.包括如何将其添加为pimp / extension方法(按OP的要求), 3.实际编译! (包括必要的导入),4.使用正确的Scala样式和格式。张贴者拒绝了我的编辑,因此我创建了一个新答案。至少有十几个人发现我的答案比最重要的答案更有帮助。

–最好的
1月23日9:02

#5 楼

我有类似的需求,找到了一种方法。但这有一个缺点(对我而言,这不是问题):您需要对每个输出文件使用一个分区对数据进行重新分区。

以这种方式进行分区通常需要事先知道该作业将输出多少个文件,并找到一个将每个键映射到每个分区的函数。

首先让我们创建基于MultipleTextOutputFormat的类:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}


使用此类,Spark将从分区(我想是第一个/最后一个)中获取一个密钥,并使用该密钥命名文件,因此在同一分区上混合多个密钥不是很好。 br />例如,您将需要一个自定义分区程序。这样就可以了:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}


现在我们将所有内容放在一起:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])


这将生成3前缀(分别为1、2和7)下的文件,一次处理所有内容。

如您所见,您需要一些有关密钥的知识才能使用此解决方案。

对我来说,这很容易,因为每个密钥哈希都需要一个输出文件,并且文件的数量在我的控制之下,所以我可以使用常规的HashPartitioner来实现此目的。

评论


到目前为止,这无疑是最好的解决方案,并且似乎可以解决问题。我有点担心,这将导致每个密钥一个文件,这将导致大型数据集出现问题。如果您可以修改答案,以便每个键的输出文件数是可配置的,我将不胜感激。

–最好的
2014年6月20日9:09

@samthebest,我可以做到,但这将是一个非常具体的解决方案。您能否更新问题以说您想要每个键多个输出文件?顺便说一句,您真的在工作中使用整数键吗?

– Douglaz
14年6月20日在21:39

好吧,任何在分区上有意义的键-当我们在其上调用toString时,这是合理的。我不确定是否需要更新答案,因为在HDFS上生成大文件是众所周知的坏习惯,因为它限制了您可以使用的压缩类型。如果我们有非常大的文件,并且我们必须选择一个可拆分的压缩算法,这可能不是最适合手头的工作。此外,由于Hadoop中的错误,Spark当前无法读取bzip2(我最喜欢的可拆分压缩)。不过,我将答案更新为明确的。再次非常感谢。

–最好的
2014年6月21日,11:13

此解决方案将所有数据都放在一个节点上,如果它们都具有相同的密钥,对吗?似乎有损于其总体可伸缩性。

–丹尼尔(Daniel Darabos)
14年6月21日在13:14

@DanielDarabos点是正确的。当然,可以对IdentityIntPartitioner进行调整,以使每个可能的键都有多个分区,例如M,其中一个是随机选择的。我们将需要使用哈希函数并通过numPartitions对结果取模,尽管那样会出现问题-不同的键可能最终出现在同一个分区中,我认为这会破坏saveAsHadoopFile吗?这是一个不平凡的问题。

–最好的
14年6月21日在16:40

#6 楼

我在Java中也需要同样的东西。将我的张湛Scala答案的翻译发布给Spark Java API用户:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}


#7 楼

saveRDS数据是基于RDD数据实现的,特别是通过以下方法实现的:PairRDD.saveAsHadoopDataset,它从执行对的PairRdd中获取数据。
我看到两个可能的选择:如果您的数据相对较小,您可以通过对RDD进行分组,从每个集合中创建一个新的RDD并使用该RDD写入数据来节省一些实现时间。像这样的东西:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}


请注意,它不适用于大型数据集,因为b / c在v.toSeq处的迭代器的实现可能不适合内存。

我看到的另一种选择,实际上是我在这种情况下推荐的选择:通过直接调用hadoop / hdfs api自己滚动。

这是我在研究此问题:
如何从另一个RDD创建RDD?

评论


是的,我想使用hadoop / hdfs api-即使用MultipleOutputFormat,但是我想知道如何做到这一点。

–最好的
2014年6月4日上午10:30

您不能在另一个RDD(第二行)中创建一个RDD。看到这个pptslideshare.net/databricks/…

–阿德里安
2015年3月25日14:07



@Adrian,您是对的。我在那儿缺少收藏品。

–马斯格
2015年3月25日14:18在

#8 楼

我有一个类似的用例,其中我根据一个密钥(每个密钥1个文件)将Hadoop HDFS上的输入文件拆分为多个文件。这是我的spark的scala代码。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))


我已根据密钥对记录进行了分组。每个密钥的值都写入单独的文件。

评论


这看起来是一个很好的解决方案,特别是因为它处理结果的可迭代对象,我得到了org.apache.spark.SparkException:任务无法序列化,您认为fs实例导致了此问题吗?

– perrohunter
15年12月17日在19:34

我喜欢这种解决方案,因为它不使用DataFrames。这个对我有用。我担心每个组仅写入1个文件,这对于大型数据集可能会造成麻烦,对吧?例如我的群组大约有150MB,这很好...

–eggie5
19年8月21日在17:02

我认为此解决方案不适用于每个键中的大量数据

– Ayoub Omari
10月27日15:29

#9 楼

如果您有多列并且要保存所有未按csv格式分区的其他列,这对于python用户来说是个好消息,如果您使用Nick的建议使用“文本”方法,这将失败。

people_df.write.partitionBy("number").text("people") 
错误消息是“ AnalysisException:u'Text数据源仅支持单列,并且您有2列。;“”

在spark 2.0.0中(我测试环境是hdp的spark 2.0.0)软件包“ com.databricks.spark.csv”现已集成,它允许我们保存仅一列分区的文本文件,请参见示例打击:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie


在我的Spark 1.6.1环境中,代码没有引发任何错误,但是仅生成了一个文件。它没有被两个文件夹分区。

希望这会有所帮助。

#10 楼

我有一个类似的用例。我通过编写两个实现MultipleTextOutputFormatRecordWriter的自定义类在Java中解决了该问题。

我的输入是JavaPairRDD<String, List<String>>,我想将其存储在以其键命名的文件中,所有行都包含在其值中。

这是我的代码MultipleTextOutputFormat实现

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 


这是我RecordWriter实现的代码。

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}


大多数代码与FileOutputFormat中的代码完全相同。唯一的区别是那几行

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}


这些行使我可以将输入List<String>的每一行写到文件上。为了避免在每行上写密钥,write函数的第一个参数设置为null

要完成此操作,我只需要执行此调用来写入我的文件

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);