音效素材网提供各类素材,打造精品素材网站!

站内导航 站长工具 投稿中心 手机访问

音效素材

SparkGraphx计算指定节点的N度关系节点源码
日期:2017-10-09 09:39:32   来源:脚本之家

直接上代码:

package horizon.graphx.util
import java.security.InvalidParameterException
import horizon.graphx.util.CollectionUtil.CollectionHelper
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/**
 * Created by yepei.ye on 2017/1/19.
 * Description:用于在图中为指定的节点计算这些节点的N度关系节点,输出这些节点与源节点的路径长度和节点id
 */
object GraphNdegUtil {
 val maxNDegVerticesCount = 10000
 val maxDegree = 1000
 /**
 * 计算节点的N度关系
 *
 * @param edges
 * @param choosedVertex
 * @param degree
 * @tparam ED
 * @return
 */
 def aggNdegreedVertices[ED: ClassTag](edges: RDD[(VertexId, VertexId)], choosedVertex: RDD[VertexId], degree: Int): VertexRDD[Map[Int, Set[VertexId]]] = {
 val simpleGraph = Graph.fromEdgeTuples(edges, 0, Option(PartitionStrategy.EdgePartition2D), StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)
 aggNdegreedVertices(simpleGraph, choosedVertex, degree)
 }
 def aggNdegreedVerticesWithAttr[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], choosedVertex: RDD[VertexId], degree: Int, sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true): VertexRDD[Map[Int, Set[VD]]] = {
 val ndegs: VertexRDD[Map[Int, Set[VertexId]]] = aggNdegreedVertices(graph, choosedVertex, degree, sendFilter)
 val flated: RDD[Ver[VD]] = ndegs.flatMap(e => e._2.flatMap(t => t._2.map(s => Ver(e._1, s, t._1, null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER)
 val matched: RDD[Ver[VD]] = flated.map(e => (e.id, e)).join(graph.vertices).map(e => e._2._1.copy(attr = e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER)
 flated.unpersist(blocking = false)
 ndegs.unpersist(blocking = false)
 val grouped: RDD[(VertexId, Map[Int, Set[VD]])] = matched.map(e => (e.source, ArrayBuffer(e))).reduceByKey(_ ++= _).map(e => (e._1, e._2.map(t => (t.degree, Set(t.attr))).reduceByKey(_ ++ _).toMap))
 matched.unpersist(blocking = false)
 VertexRDD(grouped)
 }
 def aggNdegreedVertices[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
              choosedVertex: RDD[VertexId],
              degree: Int,
              sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true
              ): VertexRDD[Map[Int, Set[VertexId]]] = {
 if (degree < 1) {
  throw new InvalidParameterException("度参数错误:" + degree)
 }
 val initVertex = choosedVertex.map(e => (e, true)).persist(StorageLevel.MEMORY_AND_DISK_SER)
 var g: Graph[DegVertex[VD], Int] = graph.outerJoinVertices(graph.degrees)((_, old, deg) => (deg.getOrElse(0), old))
  .subgraph(vpred = (_, a) => a._1 <= maxDegree)
  //去掉大节点
  .outerJoinVertices(initVertex)((id, old, hasReceivedMsg) => {
  DegVertex(old._2, hasReceivedMsg.getOrElse(false), ArrayBuffer((id, 0))) //初始化要发消息的节点
 }).mapEdges(_ => 0).cache() //简化边属性
 choosedVertex.unpersist(blocking = false)
 var i = 0
 var prevG: Graph[DegVertex[VD], Int] = null
 var newVertexRdd: VertexRDD[ArrayBuffer[(VertexId, Int)]] = null
 while (i < degree + 1) {
  prevG = g
  //发第i+1轮消息
  newVertexRdd = prevG.aggregateMessages[ArrayBuffer[(VertexId, Int)]](sendMsg(_, sendFilter), (a, b) => reduceVertexIds(a ++ b)).persist(StorageLevel.MEMORY_AND_DISK_SER)
  g = g.outerJoinVertices(newVertexRdd)((vid, old, msg) => if (msg.isDefined) updateVertexByMsg(vid, old, msg.get) else old.copy(init = false)).cache()
  prevG.unpersistVertices(blocking = false)
  prevG.edges.unpersist(blocking = false)
  newVertexRdd.unpersist(blocking = false)
  i += 1
 }
 newVertexRdd.unpersist(blocking = false)
 val maped = g.vertices.join(initVertex).mapValues(e => sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER)
 initVertex.unpersist()
 g.unpersist(blocking = false)
 VertexRDD(maped)
 }
 private case class Ver[VD: ClassTag](source: VertexId, id: VertexId, degree: Int, attr: VD = null.asInstanceOf[VD])
 private def updateVertexByMsg[VD: ClassTag](vertexId: VertexId, oldAttr: DegVertex[VD], msg: ArrayBuffer[(VertexId, Int)]): DegVertex[VD] = {
 val addOne = msg.map(e => (e._1, e._2 + 1))
 val newMsg = reduceVertexIds(oldAttr.degVertices ++ addOne)
 oldAttr.copy(init = msg.nonEmpty, degVertices = newMsg)
 }
 private def sortResult[VD: ClassTag](degs: DegVertex[VD]): Map[Int, Set[VertexId]] = degs.degVertices.map(e => (e._2, Set(e._1))).reduceByKey(_ ++ _).toMap
 case class DegVertex[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)])
 case class VertexDegInfo[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)])
 private def sendMsg[VD: ClassTag](e: EdgeContext[DegVertex[VD], Int, ArrayBuffer[(VertexId, Int)]], sendFilter: (VD, VD) => Boolean): Unit = {
 try {
  val src = e.srcAttr
  val dst = e.dstAttr
  //只有dst是ready状态才接收消息
  if (src.degVertices.size < maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src, dst)) {
  if (sendFilter(src.attr, dst.attr)) {
   e.sendToDst(reduceVertexIds(src.degVertices))
  }
  if (sendFilter(dst.attr, dst.attr)) {
   e.sendToSrc(reduceVertexIds(dst.degVertices))
  }
  }
 } catch {
  case ex: Exception =>
  println(s"==========error found: exception:${ex.getMessage}," +
   s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size}))," +
   s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}")
  ex.printStackTrace()
  throw ex
 }
 }
 private def reduceVertexIds(ids: ArrayBuffer[(VertexId, Int)]): ArrayBuffer[(VertexId, Int)] = ArrayBuffer() ++= ids.reduceByKey(Math.min)
 private def isAttrSame[VD: ClassTag](a: DegVertex[VD], b: DegVertex[VD]): Boolean = a.init == b.init && allKeysAreSame(a.degVertices, b.degVertices)
 private def allKeysAreSame(a: ArrayBuffer[(VertexId, Int)], b: ArrayBuffer[(VertexId, Int)]): Boolean = {
 val aKeys = a.map(e => e._1).toSet
 val bKeys = b.map(e => e._1).toSet
 if (aKeys.size != bKeys.size || aKeys.isEmpty) return false
 aKeys.diff(bKeys).isEmpty && bKeys.diff(aKeys).isEmpty
 }
}

其中sortResult方法里对Traversable[(K,V)]类型的集合使用了reduceByKey方法,这个方法是自行封装的,使用时需要导入,代码如下:

/**
 * Created by yepei.ye on 2016/12/21.
 * Description:
 */
object CollectionUtil {
 /**
 * 对具有Traversable[(K, V)]类型的集合添加reduceByKey相关方法
 *
 * @param collection
 * @param kt
 * @param vt
 * @tparam K
 * @tparam V
 */
 implicit class CollectionHelper[K, V](collection: Traversable[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
 def reduceByKey(f: (V, V) => V): Traversable[(K, V)] = collection.groupBy(_._1).map { case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => (a._1, f(a._2, b._2))) }
 /**
  * reduceByKey的同时,返回被reduce掉的元素的集合
  *
  * @param f
  * @return
  */
 def reduceByKeyWithReduced(f: (V, V) => V)(implicit kt: ClassTag[K], vt: ClassTag[V]): (Traversable[(K, V)], Traversable[(K, V)]) = {
  val reduced: ArrayBuffer[(K, V)] = ArrayBuffer()
  val newSeq = collection.groupBy(_._1).map {
  case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => {
   val newValue: V = f(a._2, b._2)
   val reducedValue: V = if (newValue == a._2) b._2 else a._2
   val reducedPair: (K, V) = (a._1, reducedValue)
   reduced += reducedPair
   (a._1, newValue)
  })
  }
  (newSeq, reduced.toTraversable)
 }
 }
}

总结

以上就是本文关于SparkGraphx计算指定节点的N度关系节点源码的全部内容了,希望对大家有所帮助。感兴趣的朋友可以参阅:浅谈七种常见的Hadoop和Spark项目案例  Spark的广播变量和累加器使用方法代码示例  Spark入门简介等,有什么问题请留言,小编会及时回复大家的。

    您感兴趣的教程

    在docker中安装mysql详解

    本篇文章主要介绍了在docker中安装mysql详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编...

    详解 安装 docker mysql

    win10中文输入法仅在桌面显示怎么办?

    win10中文输入法仅在桌面显示怎么办?

    win10系统使用搜狗,QQ输入法只有在显示桌面的时候才出来,在使用其他程序输入框里面却只能输入字母数字,win10中...

    win10 中文输入法

    一分钟掌握linux系统目录结构

    这篇文章主要介绍了linux系统目录结构,通过结构图和多张表格了解linux系统目录结构,感兴趣的小伙伴们可以参考一...

    结构 目录 系统 linux

    PHP程序员玩转Linux系列 Linux和Windows安装

    这篇文章主要为大家详细介绍了PHP程序员玩转Linux系列文章,Linux和Windows安装nginx教程,具有一定的参考价值,感兴趣...

    玩转 程序员 安装 系列 PHP

    win10怎么安装杜比音效Doby V4.1 win10安装杜

    第四代杜比®家庭影院®技术包含了一整套协同工作的技术,让PC 发出清晰的环绕声同时第四代杜比家庭影院技术...

    win10杜比音效

    纯CSS实现iOS风格打开关闭选择框功能

    这篇文章主要介绍了纯CSS实现iOS风格打开关闭选择框,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作...

    css ios c

    Win7如何给C盘扩容 Win7系统电脑C盘扩容的办法

    Win7如何给C盘扩容 Win7系统电脑C盘扩容的

    Win7给电脑C盘扩容的办法大家知道吗?当系统分区C盘空间不足时,就需要给它扩容了,如果不管,C盘没有足够的空间...

    Win7 C盘 扩容

    百度推广竞品词的投放策略

    SEM是基于关键词搜索的营销活动。作为推广人员,我们所做的工作,就是打理成千上万的关键词,关注它们的质量度...

    百度推广 竞品词

    Visual Studio Code(vscode) git的使用教程

    这篇文章主要介绍了详解Visual Studio Code(vscode) git的使用,小编觉得挺不错的,现在分享给大家,也给大家做个参考。...

    教程 Studio Visual Code git

    七牛云储存创始人分享七牛的创立故事与

    这篇文章主要介绍了七牛云储存创始人分享七牛的创立故事与对Go语言的应用,七牛选用Go语言这门新兴的编程语言进行...

    七牛 Go语言

    Win10预览版Mobile 10547即将发布 9月19日上午

    微软副总裁Gabriel Aul的Twitter透露了 Win10 Mobile预览版10536即将发布,他表示该版本已进入内部慢速版阶段,发布时间目...

    Win10 预览版

    HTML标签meta总结,HTML5 head meta 属性整理

    移动前端开发中添加一些webkit专属的HTML5头部标签,帮助浏览器更好解析HTML代码,更好地将移动web前端页面表现出来...

    移动端html5模拟长按事件的实现方法

    这篇文章主要介绍了移动端html5模拟长按事件的实现方法的相关资料,小编觉得挺不错的,现在分享给大家,也给大家...

    移动端 html5 长按

    HTML常用meta大全(推荐)

    这篇文章主要介绍了HTML常用meta大全(推荐),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参...

    cdr怎么把图片转换成位图? cdr图片转换为位图的教程

    cdr怎么把图片转换成位图? cdr图片转换为

    cdr怎么把图片转换成位图?cdr中插入的图片想要转换成位图,该怎么转换呢?下面我们就来看看cdr图片转换为位图的...

    cdr 图片 位图

    win10系统怎么录屏?win10系统自带录屏详细教程

    win10系统怎么录屏?win10系统自带录屏详细

    当我们是使用win10系统的时候,想要录制电脑上的画面,这时候有人会想到下个第三方软件,其实可以用电脑上的自带...

    win10 系统自带录屏 详细教程

    + 更多教程 +
    WIN服务器linux服务器FTP服务器DNS服务器其他