Today distributed compute engines are backbone of many analytic, batch & streaming applications. Spark provides many advanced features (pivot, analytic window functions, etc.) out of box to transform data. Sometimes there is need to process hierarchical data or perform hierarchical calculations. Many database vendors provide features like “Recursive CTE’s (Common Table Expressions)”[1] or “connect by”[2] SQL clause to query\transform hierarchical data. CTE’s are also known by recursive queries or parent child queries. In this post, we will look at how we can address it with Spark.
Hierarchical Data Overview –
Hierarchical relationships exist where one item of data is the parent of another item. Hierarchical data can be represented using a graph property object model where every row is a vertex(node) and connection is the edge(relationship) that connects the vertices and columns are the properties of the vertex.
Some Use Cases –
- Financial Calculations – Subaccounts rolling up into parent account all the way to topmost account
- Creating Organizational hierarchy – Manager employee relationship with path
- Generating Graph of links between web pages with path
- Any kind of Iterative computations involving linked data
Challenges –
Querying hierarchical data in a distributed system has some challenges
- The data is connected but it is distributed across partitions and nodes. The implementation to solve this should be optimized for performing iterations and moving the data (shuffle) as needed.
- Depth of graph can vary over time – solution should take care of varying depth and should not enforce users to define it before processing.
Solution –
One of the ways of implementing CTE’s in spark is using Graphx Pregel API.
What is Graphx Pregel API?
Graphx[3] is a spark API for graph and graph-parallel computation. Graph algorithms are iterative in nature and properties of vertices depends upon the properties of its directly or indirectly (connected via other vertices) connected vertices. Pregel is a vertex centric graph processing model developed by Google and spark graphx provides an optimized variant of pregel api[4].
How does Pregel API work?
Pregel API processing consists of executing super steps –
Superstep 0:
- Pass initial message to all the vertices
- Send the value as message to its directly connected vertices
Superstep 1:
- Receive messages from the previous steps
- Mutate the value
- Send the value as message to its directly connected vertices
Repeat Superstep 1 until there are messages to pass and stop when there are no more messages to be passed.
Hierarchical Data for use case –
Table below shows sample employee data that we will be using for generating top down hierarchy. Here the manager for the employee is represented by mgr_id field which has emp_id values.
We will add the following columns as part of processing
Level (Depth) | The level at which vertex stands in the hierarchy |
Path | The path from the top most vertex to current vertex in the hierarchy |
Root | The top most vertex in the hierarchy, useful when multiple hierarchies exists in the dataset |
Iscyclic | Incase if there is bad data, a cyclic relationship exists, then flag it |
Isleaf | If vertex has no parent, flag it |
Code –
// The code below demonstrates use of Graphx Pregel API - Scala 2.11+ // functions to build the top down hierarchy //setup & call the pregel api def calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = { // create the vertex RDD // primary key, root, path val verticesRDD = vertexDF .rdd .map{x=> (x.get(0),x.get(1) , x.get(2))} .map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) } // create the edge RDD // top down relationship val EdgesRDD = edgeDF.rdd.map{x=> (x.get(0),x.get(1))} .map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong,MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )} // create graph val graph = Graph(verticesRDD, EdgesRDD).cache() val pathSeperator = """/""" // initialize id,level,root,path,iscyclic, isleaf val initialMsg = (0L,0,0.asInstanceOf[Any],List("dummy"),0,1) // add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk val initialGraph = graph.mapVertices((id, v) => (id,0,v._2,List(v._3),0,v._3,1,v._1) ) val hrchyRDD = initialGraph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)( setMsg, sendMsg, mergeMsg) // build the path from the list val hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) } hrchyOutRDD } //mutate the value of the vertices def setMsg(vertexId: VertexId, value: (Long,Int,Any,List[String], Int,String,Int,Any), message: (Long,Int, Any,List[String],Int,Int)): (Long,Int, Any,List[String],Int,String,Int,Any) = { if (message._2 < 1) { //superstep 0 - initialize (value._1,value._2+1,value._3,value._4,value._5,value._6,value._7,value._8) } else if ( message._5 == 1) { // set isCyclic (value._1, value._2, value._3, value._4, message._5, value._6, value._7,value._8) } else if ( message._6 == 0 ) { // set isleaf (value._1, value._2, value._3, value._4, value._5, value._6, message._6,value._8) } else { // set new values ( message._1,value._2+1, message._3, value._6 :: message._4 , value._5,value._6,value._7,value._8) } } // send the value to vertices def sendMsg(triplet: EdgeTriplet[(Long,Int,Any,List[String],Int,String,Int,Any), _]): Iterator[(VertexId, (Long,Int,Any,List[String],Int,Int))] = { val sourceVertex = triplet.srcAttr val destinationVertex = triplet.dstAttr // check for icyclic if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1) if (destinationVertex._5==0) { //set iscyclic Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3,sourceVertex._4, 1,sourceVertex._7))) } else { Iterator.empty } else { if (sourceVertex._7==1) //is NOT leaf { Iterator((triplet.srcId, (sourceVertex._1,sourceVertex._2,sourceVertex._3, sourceVertex._4 ,0, 0 ))) } else { // set new values Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1))) } } } // receive the values from all connected vertices def mergeMsg(msg1: (Long,Int,Any,List[String],Int,Int), msg2: (Long,Int, Any,List[String],Int,Int)): (Long,Int,Any,List[String],Int,Int) = { // dummy logic not applicable to the data in this usecase msg2 } // Test with some sample data val empData = Array( ("EMP001", "Bob", "Baker", "CEO", null.asInstanceOf[String]) , ("EMP002", "Jim", "Lake", "CIO", "EMP001") , ("EMP003", "Tim", "Gorab", "MGR", "EMP002") , ("EMP004", "Rick", "Summer", "MGR", "EMP002") , ("EMP005", "Sam", "Cap", "Lead", "EMP004") , ("EMP006", "Ron", "Hubb", "Sr.Dev", "EMP005") , ("EMP007", "Cathy", "Watson", "Dev", "EMP006") , ("EMP008", "Samantha", "Lion", "Dev", "EMP007") , ("EMP009", "Jimmy", "Copper", "Dev", "EMP007") , ("EMP010", "Shon", "Taylor", "Intern", "EMP009") ) // create dataframe with some partitions val empDF = sc.parallelize(empData, 3).toDF("emp_id","first_name","last_name","title","mgr_id").cache() // primary key , root, path - dataframe to graphx for vertices val empVertexDF = empDF.selectExpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)") // parent to child - dataframe to graphx for edges val empEdgeDF = empDF.selectExpr("mgr_id","emp_id").filter("mgr_id is not null") // call the function val empHirearchyExtDF = calcTopLevelHierarcy(empVertexDF,empEdgeDF) .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)} .toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache() // extend original table with new columns val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf") // print empHirearchyDF.show()
Output –
Under the hood –
A spark job breaks up into job(s), stage(s) and task(s). Pregel API internally generates multiple jobs due to its iterative nature. A job is generated every time messages are passed to the vertexes. Each job may end up with multiple shuffles as data can be on different nodes.
Things to watch out for are long RDD lineages that are created when working with huge datasets.
Summary –
Graphx Pregel API is very powerful and can be used in solving problems which are iterative in nature or any graph computation.
References –
1.SQL Server Recursive CTE –
https://technet.microsoft.com/en-us/library/ms186243(v=sql.105).aspx
2.Oracle connect_by SQL clause –
https://docs.oracle.com/cd/B19306_01/server.102/b14200/queries003.htm
3.Graphx –
https://spark.apache.org/docs/latest/graphx-programming-guide.html
4.Pregel API –
https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api
Other References –
https://mapr.com/blog/how-get-started-using-apache-spark-graphx-scala/