应用motif模式查询
图分析有两种形式:图算法和图模式查询。
GraphFrame集成了图算法和图查询,支持跨图和Spark SQL查询的优化,而不需要将数据移动到专门的图数据库。如下图所示。
简单motif查询
Graph motif是在图中重复出现的子图或模式,表示顶点之间的交互或关系。图查询在图中搜索符合motif模式的结构,找到motif可以帮助用户执行查询来发现图中的结构模式。例如,用户可以使用Motif来分析用户所购买产品的网络关系图,根据表示产品的图的结构属性及其属性和它们之间的关系洞察用户行为(找出经常同时购买的商品)。这些信息可用于推荐和/或广告引擎。
例如,可以搜索这样的模式:A关注了 B,B 关注了 C,但是A并不关注C。找到这样的结果后,就可以把C推荐给A。如下图所示。
Motif的语法形式如下:
g.find("(start)-[pass]->(end)")
其中g为图对象,start为起点,pass为经过的边,end为目标点,顶点用括号()表示,边用方括号[]表示。例如,指定GraphFrames Motif查询模式,用来找到从A到B并且从B到C,但没有从A到C的边的结构,代码如下:
graph.find("(a)-[]->(b); (b)-[]->(c);!(a)-[]->(c)")
继续上一节中的示例,使用motifs来构建包含边和顶点的更复杂的关系。
例如,找到在两个方向上都有边的顶点对,结果是一个DataFrame,其中的列名是motif键。代码如下:
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show(false)
执行以上的代码,输出结果如下:
+----------------------------------+--------------+----------------------------------+--------------+ |a |e |b |e2 | +----------------------------------+--------------+----------------------------------+--------------+ |[b, Bob, 36, 23232323, Bananas] |[b, c, follow]|[c, Charlie, 30, 2123, Grapefruit]|[c, b, follow]| |[c, Charlie, 30, 2123, Grapefruit]|[c, b, follow]|[b, Bob, 36, 23232323, Bananas] |[b, c, follow]| +----------------------------------+--------------+----------------------------------+--------------+
注意,查询条件中,a和b可以指代相同的顶点。如果需要限制为不同的顶点,需要在返回结果中使用过滤器。
由于执行结果是一个DataFrame,所以可以在motif之上构建更复杂的查询。例如,找出30岁以上的所有相互关系,代码如下:
val filtered = motifs.filter("b.age > 30")
filtered.show(false)
执行以上代码,输出结果如下:
+----------------------------------+--------------+-------------------------------+--------------+ |a |e |b |e2 | +----------------------------------+--------------+-------------------------------+--------------+ |[c, Charlie, 30, 2123, Grapefruit]|[c, b, follow]|[b, Bob, 36, 23232323, Bananas]|[b, c, follow]| +----------------------------------+--------------+-------------------------------+--------------+
不需要返回路径中的元素时,可以使用匿名顶点和边,代码如下:
val motif = g.find("(start)-[]->()")
motif.show(false)
执行以上代码,输出结果如下:
+----------------------------------+ |start | +----------------------------------+ |[a, Alice, 34, 234, Apples] | |[b, Bob, 36, 23232323, Bananas] | |[c, Charlie, 30, 2123, Grapefruit]| |[f, Fanny, 36, 333, Apples] | |[e, Esther, 32, 1, Watermelon] | |[e, Esther, 32, 1, Watermelon] | |[d, David, 29, 2321111, Bananas] | |[a, Alice, 34, 234, Apples] | +----------------------------------+
如果只查询单向路径,使用下面的模式:
val motif = g.find("(a)-[]->(b); !(b)-[]->(a)")
motif.show(false)
执行以上代码,输出结果如下:
+--------------------------------+----------------------------------+ |a |b | +--------------------------------+----------------------------------+ |[f, Fanny, 36, 333, Apples] |[c, Charlie, 30, 2123, Grapefruit]| |[a, Alice, 34, 234, Apples] |[e, Esther, 32, 1, Watermelon] | |[e, Esther, 32, 1, Watermelon] |[f, Fanny, 36, 333, Apples] | |[d, David, 29, 2321111, Bananas]|[a, Alice, 34, 234, Apples] | |[e, Esther, 32, 1, Watermelon] |[d, David, 29, 2321111, Bananas] | |[a, Alice, 34, 234, Apples] |[b, Bob, 36, 23232323, Bananas] | +--------------------------------+----------------------------------+
执行有条件的路径搜索,代码如下:
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
paths.show
执行以上代码,输出结果如下:
+----------------------------------+--------------+-------------------------------+ |a |e |b | +----------------------------------+--------------+-------------------------------+ |[c, Charlie, 30, 2123, Grapefruit]|[c, b, follow]|[b, Bob, 36, 23232323, Bananas]| |[e, Esther, 32, 1, Watermelon] |[e, f, follow]|[f, Fanny, 36, 333, Apples] | +----------------------------------+--------------+-------------------------------+
进一步选择关系数据集中的列,代码如下:
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// val e2 = paths.select("e.*")
e2.show()
执行以上代码,输出结果如下:
+---+---+------------+ |src|dst|relationship| +---+---+------------+ | c| b| follow| | e| f| follow| +---+---+------------+
状态查询
无状态查询,没有指定任务限制条件,虽易于表达,但只能在查询完后再进行过滤,会返回一个较大的数据集。大多数motif查询是无状态的。
Motif支持更复杂的查询,这些查询沿着motif中的路径携带状态。通过将GraphFrame motif查找与结果集上的过滤器相结合来表达这些查询,过滤器使用序列操作来构造一系列DataFrame列。
例如,要在图g中查找4个顶点(人)构成的关系链,要求其中至少有两个是friend(朋友)关系。在本例中,要维护的状态是属性为friend的边的计数,代码如下:
// 查找4个顶点构成的关系链, 要求其中至少有两个是“friend”-朋友关系
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
chain4.printSchema()
// 序列查询,带状态(cnt)
//(a) 定义每一个顶点更新状态的方法:如果关系为friend则cnt+1
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//(b) 将更新方法应用到整个链的,链上每有一个关系是friend就加一,链上共三个关系(3条边)
val numFriends = Seq("ab", "bc", "cd")
.foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//(c) 传入限制条件:对DataFrame应用过滤器
val chainWith2Friends = chain4.where(numFriends >= 2)
chainWith2Friends.show()
执行以上代码,输出结果如下:
+--------------------+--------------+--------------------+--------------+--------------------+--------------+--------------------+ | a| ab| b| bc| c| cd| d| +--------------------+--------------+--------------------+--------------+--------------------+--------------+--------------------+ |[e, Esther, 32, 1...|[e, d, friend]|[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...|[a, e, friend]|[e, Esther, 32, 1...| |[e, Esther, 32, 1...|[e, d, friend]|[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...|[a, b, friend]|[b, Bob, 36, 2323...| |[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...|[a, e, friend]|[e, Esther, 32, 1...|[e, d, friend]|[d, David, 29, 23...| |[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...|[a, e, friend]|[e, Esther, 32, 1...|[e, f, follow]|[f, Fanny, 36, 33...| |[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...|[a, b, friend]|[b, Bob, 36, 2323...|[b, c, follow]|[c, Charlie, 30, ...| |[a, Alice, 34, 23...|[a, e, friend]|[e, Esther, 32, 1...|[e, d, friend]|[d, David, 29, 23...|[d, a, friend]|[a, Alice, 34, 23...| +--------------------+--------------+--------------------+--------------+--------------------+--------------+--------------------+
如果想进一步统计每个查找出的关系链中的朋友数量,可以修改上面的代码如下:
val chainWith2Friends = chain4
.select("ab","bc","cd")
.withColumn("num_friends", numFriends)
.where("num_friends>=2")
chainWith2Friends.show()
执行以上代码,输出结果如下:
+--------------+--------------+--------------+-----------+ | ab| bc| cd|num_friends| +--------------+--------------+--------------+-----------+ |[e, d, friend]|[d, a, friend]|[a, e, friend]| 3| |[e, d, friend]|[d, a, friend]|[a, b, friend]| 3| |[d, a, friend]|[a, e, friend]|[e, d, friend]| 3| |[d, a, friend]|[a, e, friend]|[e, f, follow]| 2| |[d, a, friend]|[a, b, friend]|[b, c, follow]| 2| |[a, e, friend]|[e, d, friend]|[d, a, friend]| 3| +--------------+--------------+--------------+-----------+
在上面这个示例中,查找由4个顶点组成的关系链。在这4个顶点a->b->c->d的关系链中,找出匹配这个过滤条件的子集,其实现过程如下:
- (1) 初始化路径上的状态。
- (2) 基于顶点a更新状态。
- (3) 基于顶点b更新状态。
- (4) 基于顶点c更新状态。
- (5) 基于顶点d更新状态。
- (6) 对最终状态结果进行过滤。如果最终状态匹配某个条件,则该关系链被过滤器接受。