Ответ 1
Из вашего примера я беру вашу проблему в том, что вы хотите построить граф, который имеет ребро a → b, если только если в исходном графе была вершина v и ребра v → a и v → b. (Это может быть или не быть стандартным определением "соседа второй степени" в ориентированном графе, но в любом случае интересно.)
Здесь решение в Scala. Он создает граф со всеми исходными вершинами, но только с требуемыми ребрами. Я помещаю пустые строки как данные во все вершины и ребра.
Предполагая, что SparkContext sc, как обычно, попадает в оболочку Spark и доступен GraphX, настройте примерный граф:
val vertices: RDD[(VertexId, String)] =
sc.parallelize(Array((1L,""), (2L,""), (4L,""), (6L,"")))
val edges: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, ""), Edge(1L, 4L, ""), Edge(1L, 6L, "")))
val inputGraph = Graph(vertices, edges)
Составьте чередующийся набор вершин, каждый из которых аннотируется множеством их преемников, поэтому в вашем примере v1 будет аннотироваться с помощью {v2, v4, v6}.
val verticesWithSuccessors: VertexRDD[Array[VertexId]] =
inputGraph.ops.collectNeighborIds(EdgeDirection.Out)
Создайте новый граф, используя эти вершины и исходные ребра.
val successorSetGraph = Graph(verticesWithSuccessors, edges)
Теперь нам нужно нажимать эти множества вдоль каждого ребра, создавая еще один набор вершин, на этот раз все аннотируются со своими соседями. Нам нужно комбинировать наборы в вершинах адресатов, поэтому с помощью Scala Set
удаляются дубликаты. Нам также нужно удалить каждую вершину из множества ее соседей, а значит, добавим дополнительные map
в конец.
val ngVertices: VertexRDD[Set[VertexId]] =
successorSetGraph.mapReduceTriplets[Set[VertexId]] (
triplet => {
Iterator((triplet.dstId, triplet.srcAttr.toSet))
},
(s1, s2) => s1 ++ s2
).mapValues[Set[VertexId]](
(id: VertexId, neighbors: Set[VertexId]) => neighbors - id
)
Теперь мы почти готовы создать окончательный график, но нам нужно ребро для каждого соседнего отношения:
val ngEdges = ngVertices.flatMap[Edge[String]](
{
case (source: VertexId, allDests: Set[VertexId]) => {
allDests.map((dest: VertexId) => Edge(source, dest, ""))
}
}
)
Теперь мы можем собрать все это вместе:
val neighborGraph = Graph(vertices, ngEdges)
Я уверен, что эксперт может сделать лучше, особенно с точки зрения производительности, но большинство идей, на которые это опирается, можно найти в Руководство по программированию GraphX.