Last month I have finished depth-first search and breadth-first search in Morpheus. Morpheus, based on hash distributed key-value store, requires to traversal vertices in distributed and even parallelised method.
The first thing comes to my mind is building a messaging passing infrastructure. Message passing infrastructure is almost the same as RPC in cluster connector, but use more succinct and determined data format to improve it's performance. Message passing is asynchronous, it means that sender does not wait for receiver to complete the actual processing, but for the delivery. Each message for a task have a 128 bit long ID for nodes to determinate the messages from the tasks. Each message also corresponding to one action, enables messages dispatcher to invoke the function bind to the action. Just like the RPC in cluster connector, if the message receiver is the same as sender, the message will not go through the network interface even will not been serialize or deserialize for transmission.
Both DFS and BFS algorithm are built based on this infrastructure. But their behaviours are vary. For example, DFS cannot been paralleled. To conduct DFS on distributed system, the stack to the visited and discovered nodes must be transfer to the nodes once at a time for updates. BFS, in the other hand, is able to paralleled by conduct each node to discover the vertices children they belongs to in each level. We will discuss how those 2 kinds of graph traversal been implemented in Morpheus in next few chapters.
DFS in Morpheus, adapted ideas from S.A.M. Makki and George Havas thesis. Passing vertices stack through nodes for update. This method is single threaded because depth-first search is inherently sequential. DFS in Morpheus is a good choice when the size of subgraph that the start vertex belongs is not very large. It is also suitable for conventional graph traversal for user queries like detect links and existence of path.
BFS in Morpheus, is more complicated. Currently, Morpheus supports parallel search in each level. Morpheus rely on nested fork-join multithread pattern, illustrated in the figure below
Consider the nodes contains vertices represents as A, B, C in 'Parallel Task I'; 'Main Thread' as the coordinator server (may be any one of the nodes in the cluster); 'Parallel Task I', 'Parallel Task II', 'Parallel Task III' as search task for each level. One BFS request will contain start vertex, end vertex and search rules. Here is what happend when the search begins.
- The coordinator first put the first vertex in the search list, send it to the node that belongs to the first vertex by it's id hashing and wait for the return message.
- The node received the search list and get the neighbour ids to the vertices in parallel according to search rules.
- When all parallel search for search ended, it send the search result to the coordinator as return message.
- The coordinator may receive return messages from multiply servers (it is not possible when there is only one vertex in the search list). When each of the return message arrived, it will tried to update local task status for each vertices, indicates whether have been visited, level, and parent
- After the coordinator received all of the return messages, it will extract the vertices for next level from local task status, partition the vertices by their servers into search lists and send them to their server
- Level advanced, step 2 will take the job. The iteration will continue until stop condition fulfilled or reached the maximum level in search rules when executing step 5
The idea to this algorithm was adapted from Lawrence National Laboratory on BlueGene. But I think I have not yet fully replicated their design, because in my implementation, coordinator will become the bottleneck in step 4 and 5; it will also cost a lot of memory to cache the vertices that have already been discovered, that's why I also provided on-disk cache for this occasion. BFS is used for finding shortest path, faster but more resource consuming path finding. It may also become the backbone of many other algorithms like fraud detection and community discovery.
To distribute task information like task id, parameters to each node in the cluster, we need a global shared hash map that any of it's changes can been updated to each node. I use paxos to do the job. There are also some other components like disk based hash map might come handy. I packed those computation related into hurricane component.