Spark学习之RDD简单算子希望给大家带来一些帮助。
collect
返回RDD的所有元素
- scala>varinput=sc.parallelize(Array(-1,0,1,2,2))
- input:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[15]atparallelizeat<console>:27
- scala>varresult=input.collect
- result:Array[Int]=Array(-1,0,1,2,2)
count,coutByValue
count返回RDD的元素数量,countByValue返回每个值的出现次数
- scala>varinput=sc.parallelize(Array(-1,0,1,2,2))
- scala>varresult=input.count
- result:Long=5
- scala>varresult=input.countByValue
- result:scala.collection.Map[Int,Long]=Map(0->1,1->1,2->2,-1->1)
take,top,takeOrdered
take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素
- varinput=sc.parallelize(Array(1,2,3,4,9,8,7,5,6))
- scala>varresult=input.take(6)
- result:Array[Int]=Array(1,2,3,4,9,8)
- scala>varresult=input.take(20)
- result:Array[Int]=Array(1,2,3,4,9,8,7,5,6)
- scala>varresult=input.takeOrdered(6)
- result:Array[Int]=Array(1,2,3,4,5,6)
- scala>varresult=input.takeOrdered(6)(Ordering[Int].reverse)
- result:Array[Int]=Array(9,8,7,6,5,4)
- scala>varresult=input.top(6)
- result:Array[Int]=Array(9,8,7,6,5,4
- )
Filter
传入返回值为boolean的函数,返回改函数结果为true的RDD
- scala>varinput=sc.parallelize(Array(-1,0,1,2))
- scala>varresult=input.filter(_>0).collect()
- result:Array[Int]=Array(1,2)
map,flatmap
map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出
- scala>varinput=sc.parallelize(Array(-1,0,1,2))
- scala>varresult=input.map(_+1).collect
- result:Array[Int]=Array(0,1,2,3)
- scala>varresult=input.map(x=>x.to(3)).collect
- result:Array[scala.collection.immutable.Range.Inclusive]=Array(Range(-1,0,1,2,3),Range(0,1,2,3),Range(1,2,3),Range(2,3))
- scala>varresult=input.flatMap(x=>x.to(3)).collect
- result:Array[Int]=Array(-1,0,1,2,3,0,1,2,3,1,2,3,2,3)
distinct
RDD去重
- scala>varinput=sc.parallelize(Array(-1,0,1,2,2))
- scala>varresult=input.distinct.collect
- result:Array[Int]=Array(0,1,2,-1)
Reduce
通过函数聚集RDD中的所有元素
- scala>varinput=sc.parallelize(Array(-1,0,1,2))
- scala>varresult=input.reduce((x,y)=>{println(x,y);x+y})
- (-1,1)//处理-1,1,结果为0,RDD剩余元素为{0,2}
- (0,2)//上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0}
- (2,0)//上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{}
- result:Int=2
sample,takeSample
sample就是从RDD中抽样,***个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,***个参数是withReplacement,第二个参数是样本个数
- varrdd=sc.parallelize(1to20)
- scala>rdd.sample(true,0.5).collect
- res33:Array[Int]=Array(6,8,13,15,17,17,17,18,20)
- scala>rdd.sample(false,0.5).collect
- res35:Array[Int]=Array(1,3,10,11,12,13,14,17,18)
- scala>rdd.sample(true,1).collect
- res44:Array[Int]=Array(2,2,3,5,6,6,8,9,9,10,10,10,14,15,16,17,17,18,19,19,20,20)
- scala>rdd.sample(false,1).collect
- res46:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
- scala>rdd.takeSample(true,3)
- res1:Array[Int]=Array(1,15,19)
- scala>rdd.takeSample(false,3)
- res2:Array[Int]=Array(7,16,6)
collectAsMap,countByKey,lookup
collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value
- scala>varinput=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))
- scala>varresult=input.collectAsMap
- result:scala.collection.Map[Int,String]=Map(2->two,4->four,1->one,3->three)
- scala>varresult=input.countByKey
- result:scala.collection.Map[Int,Long]=Map(1->2,2->1,3->1,4->1)
- scala>varresult=input.lookup(1)
- result:Seq[String]=WrappedArray(1,one)
- scala>varresult=input.lookup(2)
- result:Seq[String]=WrappedArray(two)
groupBy,keyBy
groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key
- scala>varrdd=sc.parallelize(List("A1","A2","B1","B2","C"))
- scala>varresult=rdd.groupBy(_.substring(0,1)).collect
- result:Array[(String,Iterable[String])]=Array((A,CompactBuffer(A1,A2)),(B,CompactBuffer(B1,B2)),(C,CompactBuffer(C)))
- scala>varrdd=sc.parallelize(List("hello","world","spark","is","fun"))
- scala>varresult=rdd.keyBy(_.length).collect
- result:Array[(Int,String)]=Array((5,hello),(5,world),(5,spark),(2,is),(3,fun))
keys,values
0
- scala>varinput=sc.parallelize(Array(-1,0,1,2,2))
- scala>varresult=input.count
- result:Long=5
- scala>varresult=input.countByValue
- result:scala.collection.Map[Int,Long]=Map(0->1,1->1,2->2,-1->1)
union,intersection,subtract,cartesian
union合并2个集合,不去重 subtract将***个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积
1
- scala>varinput=sc.parallelize(Array(-1,0,1,2,2))
- scala>varresult=input.count
- result:Long=5
- scala>varresult=input.countByValue
- result:scala.collection.Map[Int,Long]=Map(0->1,1->1,2->2,-1->1)
知优网 » Spark学习之RDD简单算子(spark rdd 算子)