对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限。

 Spark Graphx 实现图中极大团挖掘, 伪并行化算法(spark graphx图算法案例) 挖掘 伪并行化 第1张

####背景:####

Spark Graphx并未提供极大团挖掘算法

当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

####思路:####

spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集

利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)

对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限

期待真正的并行化的极大团算法

####配置文件:####

  1. graph_data_path=hdfs://localhost/graph_data
  2. out_path=hdfs://localhost/clique
  3. ck_path=hdfs://localhost/checkpoint
  4. numIter=50剪枝次数
  5. count=3极大团顶点数大小
  6. algorithm=2极大团算法,1:个人实现2:jgrapht
  7. percent=90剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高
  8. spark.master=local
  9. spark.app.name=graph
  10. spark.serializer=org.apache.spark.serializer.KryoSerializer
  11. spark.yarn.executor.memoryOverhead=20480
  12. spark.yarn.driver.memoryOverhead=20480
  13. spark.driver.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC
  14. spark.executor.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC
  15. spark.driver.maxResultSize=10g
  16. spark.default.parallelism=60

jgrapht

####样本数据:####

{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}

####样本图:####

Spark Graphx 实现图中极大团挖掘, 伪并行化算法(spark graphx图算法案例)  挖掘 伪并行化 第2张

####输出:####

0,1,2 0,2,3 3,4,5 4,5,6

####代码实现:####

  1. importjava.utilimportjava.util.Properties
  1. importorg.apache.spark.broadcast.Broadcast
  2. importorg.apache.spark.graphx.{Edge,Graph}
  3. importorg.apache.spark.rdd.RDD
  4. importorg.apache.spark.sql.{Row,SQLContext}
  5. importorg.apache.spark.storage.StorageLevel
  6. importorg.apache.spark.{SparkConf,SparkContext}
  7. importorg.jgrapht.alg.BronKerboschCliqueFinder
  8. importorg.jgrapht.graph.{DefaultEdge,SimpleGraph}
  9. importscala.collection.JavaConverters._
  10. importscala.collection.mutable
  11. objectApplicationTitan{
  12. defmain(args:Array[String]){
  13. valprop=newProperties()
  14. prop.load(getClass.getResourceAsStream("/config.properties"))
  15. valgraph_data_path=prop.getProperty("graph_data_path")
  16. valout_path=prop.getProperty("out_path")
  17. valck_path=prop.getProperty("ck_path")
  18. valcount=Integer.parseInt(prop.getProperty("count"))
  19. valnumIter=Integer.parseInt(prop.getProperty("numIter"))
  20. valalgorithm=Integer.parseInt(prop.getProperty("algorithm"))
  21. valpercent=Integer.parseInt(prop.getProperty("percent"))
  22. valconf=newSparkConf()
  23. try{
  24. Runtime.getRuntime.exec("hdfsdfs-rm-r"+out_path)
  25. //Runtime.getRuntime.exec("cmd.exe/Crd/s/q"+out_path)
  26. }catch{
  27. caseex:Exception=>
  28. ex.printStackTrace(System.out)
  29. }
  30. prop.stringPropertyNames().asScala.foreach(s=>{
  31. if(s.startsWith("spark")){
  32. conf.set(s,prop.getProperty(s))
  33. }
  34. })
  35. conf.registerKryoClasses(Array(getClass))
  36. valsc=newSparkContext(conf)
  37. sc.setLogLevel("ERROR")
  38. sc.setCheckpointDir(ck_path)
  39. valsqlc=newSQLContext(sc)
  40. try{
  41. vale_df=sqlc.read
  42. //.json(graph_data_path)
  43. .parquet(graph_data_path)
  44. vare_rdd=e_df
  45. .mapPartitions(it=>{
  46. it.map({
  47. caseRow(dst:String,src:String)=>
  48. valsrc_long=src.toLong
  49. valdst_long=dst.toLong
  50. if(src_long<dst_long)(src_long,dst_long)else(dst_long,src_long)
  51. })
  52. }).distinct()
  53. e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
  54. varbc:Broadcast[Set[Long]]=null
  55. variter=0
  56. varbc_size=0
  57. //剪枝
  58. while(iter<=numIter){
  59. valtemp=e_rdd
  60. .flatMap(x=>List((x._1,1),(x._2,1)))
  61. .reduceByKey((x,y)=>x+y)
  62. .filter(x=>x._2>=count-1)
  63. .mapPartitions(it=>it.map(x=>x._1))
  64. valbc_value=temp.collect().toSet
  65. bc=sc.broadcast(bc_value)
  66. e_rdd=e_rdd.filter(x=>bc.value.contains(x._1)&&bc.value.contains(x._2))
  67. e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
  68. iter+=1
  69. if(bc_size!=0&&bc_value.size>=bc_size*percent/100){
  70. println("totaliter:"+iter)
  71. iter=Int.MaxValue
  72. }
  73. bc_size=bc_value.size
  74. }
  75. //构造图
  76. valedge:RDD[Edge[Long]]=e_rdd.mapPartitions(it=>it.map(x=>Edge(x._1,x._2)))
  77. valgraph=Graph.fromEdges(edge,0,StorageLevel.MEMORY_AND_DISK_SER,StorageLevel.MEMORY_AND_DISK_SER)
  78. //连通图
  79. valcc=graph.connectedComponents().vertices
  80. cc.persist(StorageLevel.MEMORY_AND_DISK_SER)
  81. cc.join(e_rdd)
  82. .mapPartitions(it=>it.map(x=>((math.random*10).toInt.toString.concat(x._2._1.toString),(x._1,x._2._2))))
  83. .aggregateByKey(List[(Long,Long)]())((list,v)=>list:+v,(list1,list2)=>list1:::list2)
  84. .mapPartitions(it=>it.map(x=>(x._1.substring(1),x._2)))
  85. .aggregateByKey(List[(Long,Long)]())((list1,list2)=>list1:::list2,(list3,list4)=>list3:::list4)
  86. .filter(x=>x._2.size>=count-1)
  87. .flatMap(x=>{
  88. if(algorithm==1)
  89. find(x,count)
  90. else
  91. find2(x,count)
  92. })
  93. .mapPartitions(it=>{
  94. it.map({
  95. caseset=>
  96. vartemp=""
  97. set.asScala.foreach(x=>temp+=x+",")
  98. temp.substring(0,temp.length-1)
  99. case_=>
  100. })
  101. })
  102. //.coalesce(1)
  103. .saveAsTextFile(out_path)
  104. }
  105. catch{
  106. caseex:Exception=>
  107. ex.printStackTrace(System.out)
  108. }
  109. sc.stop()
  110. }
  111. //自己实现的极大团算法
  112. deffind(x:(String,List[(Long,Long)]),count:Int):mutable.Set[util.Set[String]]={
  113. println(x._1+"|s|"+x._2.size)
  114. println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
  115. valneighbors=newutil.HashMap[String,util.Set[String]]
  116. valfinder=newCliqueFinder(neighbors,count)
  117. x._2.foreach(r=>{
  118. valv1=r._1.toString
  119. valv2=r._2.toString
  120. if(neighbors.containsKey(v1)){
  121. neighbors.get(v1).add(v2)
  122. }else{
  123. valtemp=newutil.HashSet[String]()
  124. temp.add(v2)
  125. neighbors.put(v1,temp)
  126. }
  127. if(neighbors.containsKey(v2)){
  128. neighbors.get(v2).add(v1)
  129. }else{
  130. valtemp=newutil.HashSet[String]()
  131. temp.add(v1)
  132. neighbors.put(v2,temp)
  133. }
  134. })
  135. println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
  136. finder.findMaxCliques().asScala
  137. }
  138. //jgrapht中的极大团算法
  139. deffind2(x:(String,List[(Long,Long)]),count:Int):Set[util.Set[String]]={
  140. println(x._1+"|s|"+x._2.size)
  141. println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
  142. valto_clique=newSimpleGraph[String,DefaultEdge](classOf[DefaultEdge])
  143. x._2.foreach(r=>{
  144. valv1=r._1.toString
  145. valv2=r._2.toString
  146. to_clique.addVertex(v1)
  147. to_clique.addVertex(v2)
  148. to_clique.addEdge(v1,v2)
  149. })
  150. valfinder=newBronKerboschCliqueFinder(to_clique)
  151. vallist=finder.getAllMaximalCliques.asScala
  152. varresult=Set[util.Set[String]]()
  153. list.foreach(x=>{
  154. if(x.size()>=count)
  155. result=result+x
  156. })
  157. println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
  158. result
  159. }
  160. }

//自己实现的极大团算法

  1. importjava.util.*;
  2. /**
  3. *[@author](https://my.oschina.net/arthor)mopspecial@gmail.com
  4. *[@date](https://my.oschina.net/u/2504391)2017/7/31
  5. */
  6. publicclassCliqueFinder{
  7. privateMap<String,Set<String>>neighbors;
  8. privateSet<String>nodes;
  9. privateSet<Set<String>>maxCliques=newHashSet<>();
  10. privateIntegerminSize;
  11. publicCliqueFinder(Map<String,Set<String>>neighbors,IntegerminSize){
  12. this.neighbors=neighbors;
  13. this.nodes=neighbors.keySet();
  14. this.minSize=minSize;
  15. }
  16. privatevoidbk3(Set<String>clique,List<String>candidates,List<String>excluded){
  17. if(candidates.isEmpty()&&excluded.isEmpty()){
  18. if(!clique.isEmpty()&&clique.size()>=minSize){
  19. maxCliques.add(clique);
  20. }
  21. return;
  22. }
  23. for(Strings:degeneracy_order(candidates)){
  24. List<String>new_candidates=newArrayList<>(candidates);
  25. new_candidates.retainAll(neighbors.get(s));
  26. List<String>new_excluded=newArrayList<>(excluded);
  27. new_excluded.retainAll(neighbors.get(s));
  28. Set<String>nextClique=newHashSet<>(clique);
  29. nextClique.add(s);
  30. bk2(nextClique,new_candidates,new_excluded);
  31. candidates.remove(s);
  32. excluded.add(s);
  33. }
  34. }
  35. privatevoidbk2(Set<String>clique,List<String>candidates,List<String>excluded){
  36. if(candidates.isEmpty()&&excluded.isEmpty()){
  37. if(!clique.isEmpty()&&clique.size()>=minSize){
  38. maxCliques.add(clique);
  39. }
  40. return;
  41. }
  42. Stringpivot=pick_random(candidates);
  43. if(pivot==null){
  44. pivot=pick_random(excluded);
  45. }
  46. List<String>tempc=newArrayList<>(candidates);
  47. tempc.removeAll(neighbors.get(pivot));
  48. for(Strings:tempc){
  49. List<String>new_candidates=newArrayList<>(candidates);
  50. new_candidates.retainAll(neighbors.get(s));
  51. List<String>new_excluded=newArrayList<>(excluded);
  52. new_excluded.retainAll(neighbors.get(s));
  53. Set<String>nextClique=newHashSet<>(clique);
  54. nextClique.add(s);
  55. bk2(nextClique,new_candidates,new_excluded);
  56. candidates.remove(s);
  57. excluded.add(s);
  58. }
  59. }
  60. privateList<String>degeneracy_order(List<String>innerNodes){
  61. List<String>result=newArrayList<>();
  62. Map<String,Integer>deg=newHashMap<>();
  63. for(Stringnode:innerNodes){
  64. deg.put(node,neighbors.get(node).size());
  65. }
  66. while(!deg.isEmpty()){
  67. Integermin=Collections.min(deg.values());
  68. StringminKey=null;
  69. for(Stringkey:deg.keySet()){
  70. if(deg.get(key).equals(min)){
  71. minKey=key;
  72. break;
  73. }
  74. }
  75. result.add(minKey);
  76. deg.remove(minKey);
  77. for(Stringk:neighbors.get(minKey)){
  78. if(deg.containsKey(k)){
  79. deg.put(k,deg.get(k)-1);
  80. }
  81. }
  82. }
  83. returnresult;
  84. }
  85. privateStringpick_random(List<String>random){
  86. if(random!=null&&!random.isEmpty()){
  87. returnrandom.get(0);
  88. }else{
  89. returnnull;
  90. }
  91. }
  92. publicSet<Set<String>>findMaxCliques(){
  93. this.bk3(newHashSet<>(),newArrayList<>(nodes),newArrayList<>());
  94. returnmaxCliques;
  95. }
  96. publicstaticvoidmain(String[]args){
  97. Map<String,Set<String>>neighbors=newHashMap<>();
  98. neighbors.put("0",newHashSet<>(Arrays.asList("1","2","3")));
  99. neighbors.put("1",newHashSet<>(Arrays.asList("0","2")));
  100. neighbors.put("2",newHashSet<>(Arrays.asList("0","1","3","6")));
  101. neighbors.put("3",newHashSet<>(Arrays.asList("0","2","4","5")));
  102. neighbors.put("4",newHashSet<>(Arrays.asList("3","5","6")));
  103. neighbors.put("5",newHashSet<>(Arrays.asList("3","4","6")));
  104. neighbors.put("6",newHashSet<>(Arrays.asList("2","4","5")));
  105. neighbors.put("7",newHashSet<>(Arrays.asList("6")));
  106. CliqueFinderfinder=newCliqueFinder(neighbors,3);
  107. finder.bk3(newHashSet<>(),newArrayList<>(neighbors.keySet()),newArrayList<>());
  108. System.out.println(finder.maxCliques);
  109. }
  110. }
转载请说明出处
知优网 » Spark Graphx 实现图中极大团挖掘, 伪并行化算法(spark graphx图算法案例)

发表评论

您需要后才能发表评论