opebet网页版-bet亚洲官网手机版-登录

教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

传智教育|传智播客

一样的教育,不一样的品质

全国校区

 

RDD是如何操作数据转换的?RDD转换算子API示例

更新时间:2020年12月21日18时35分 来源:传智教育 浏览次数:

好口碑IT培训

  RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,如表1所示。

  表1 常用的转换算子API

1608546329889_21.png

  下面,大家通过结合具体的示例对这些转换算子API进行详细讲解。

  ·filter(func)

  filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。假设,有一个文件test.txt(内容如文件3-1),下面,通过一张图来描述如何通过filter算子操作,筛选出包含单词“spark”的元素,具体过程如图1所示。

1608545555238_22.jpg

   图1 filter算子操作

  在图1中,通过从test.txt文件中加载数据的方式创建RDD,然后通过filter操作筛选出满足条件的元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

  scala> val lines = sc.textFile("file:///export/data/test.txt")
  lines: org.apache.spark.rdd.RDD[String] = `[file:///export/data/test.txt]`(file:///\\export\data\test.txt)
              MapPartitionsRDD[1] at textFile at :24
  scala> val linesWithSpark = lines.filter(line => line.contains("spark"))
  linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at
                             filter at :25

  在上述代码中,filter()输入的参数line => line.contains(“spark”)是一个匿名函数,其含义是依次取出lines这个RDD中的每一个元素,对于当前取到的元素,把它赋值给匿名函数中的line变量。若line中包含“spark”单词,就把这个元素加入到RDD(即linesWithSpark)中,否则就丢弃该元素。

  ·map(func)

  map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。假设,有一个文件test.txt(内容如文件1),接下来,通过一张图来描述如何通过map算子操作把文件内容拆分成一个个的单词并封装在数组对象中,具体过程如图2所示。

1608545566832_23.jpg

 图2 map算子操作

  在图2中,通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

  scala> val lines = sc.textFile("file:///export/data/test.txt")
  lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
              MapPartitionsRDD[4] at textFile at :24
  scala> val words = lines.map(line => line.split(" "))
  words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[13] at
                             map at :25

  上述代码中,lines.map(line => line.split(“ ”))含义是依次取出lines这个RDD中的每个元素,对于当前取到的元素,把它赋值给匿名函数中的line变量。由于line是一行文本,如“hadoop spark”,一行文本中包含多个单词,且空格进行分隔,通过line.split(“ ”)匿名函数,将文本分成一个个的单词,拆分后得到的单词都被封装到一个数组对象中,成为新的RDD(即words)的一个元素。

  ·flatMap(func)

  flatMap(func)与map(func)相似,但是每个输入的元素都可以映射到0或者多个输出的结果。有一个文件test.txt(内容如文件3-1),接下来,通过一张图来描述如何通过flatMap算子操作,把文件内容拆分成一个个的单词,具体过程如图3所示。

1608545586788_24.jpg

图3 flatMap算子操作

  在图3中,通过从test.txt文件中加载数据的方式创建RDD,然后通过flatMap操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

 scala> val lines = sc.textFile("file:///export/data/test.txt")
  lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
              MapPartitionsRDD[5] at textFile at :24
  scala> val words = lines.flatMap(line => line.split(" "))
  words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at
                             map at :25

  在上述代码中,lines. flatMap(line => line.split(“ ”))等价于先实行lines.map(line => line.split(“ ”))操作(请参考map(func)操作),再实行flat()操作(即扁平化操作),把wordArray中的每个RDD都扁平成多个元素,被扁平后得到的元素构成一个新的RDD(即words)。

  groupByKey()

  groupByKey()主要用于(Key,Value)键值对的数据集,将具有相同Key的Value进行分组,会返回一个新的(Key,Iterable)形式的数据集。同样以文件test.txt为例,接下来,通过一张图来描述如何通过groupByKey算子操作,将文件内容中的所有单词进行分组,具体过程如图4示。

1608545606730_25.jpg

图4 groupByKey算子操作

  在图4中,通过groupByKey操作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

scala> val lines = sc.textFile("file:///export/data/test.txt")
  lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)             MapPartitionsRDD[6] at textFile at :24
  scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
  words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at                            map at :25
  scala> val groupWords=words.groupByKey()
   groupWords: org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16]
                        at groupByKey at :25

  上述代码中,words.groupByKey()操作实行后,RDD中所有的Key相同的Value都被合并到一起。例如,(“spark”,1)、(“spark”,1)、(“spark”,1)这三个键值对的Key都是“spark”,合并后得到新的键值对(“spark”,(1,1,1))。

  reduceByKey(func)

  reduceByKey()主要用于(Key,Value)键值对的数据集,返回的是一个新的(Key,Iterable)形式的数据集,该数据集是每个Key传递给函数func进行聚合运算后得到的结果。同样以文件test.txt(内容如文件3-1),接下来,通过一张图来描述如何通过reduceByKey算子操作统计单词出现的次数,具体操作如图5所示。

1608545621121_26.jpg
 图5 reduceByKey()算子操作

  在图5中,通过reduceByKey操作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

 scala> val lines = sc.textFile("file:///export/data/test.txt")  lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
              MapPartitionsRDD[7] at textFile at :24
  scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))  words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at                              map at :25
  scala> val reduceWords=words.reduceByKey((a,b)=>a+b)
  reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at
                         reduceByKey at :25

  上述代码中,实行words.reduceByKey((a,b) => a + b)操作,共分为两个步骤,分别是先实行reduceByKey()操作,将所有Key相同的Value值合并到一起,生成一个新的键值对(例如(“spark”,(1,1,1)));然后实行函数func的操作,即使用(a,b)=> a + b函数把(1,1,1)进行聚合求和,得到最终的结果,即(“spark”,3)。    

猜你喜欢:

怎样使用Spark Shell来读取HDFS文件

HBase数据库是怎样存储数据的?

怎样使用Linux和HDFS创建RDD?

传智大数据培训课程

opebet网页版|bet亚洲官网手机版

XML 地图 | Sitemap 地图