Building a Graph Engine -- Computation Engines

Morpheus is a distributed graph engine. Once I resolved it's  storage problem, I start to build it's computation engines. If I just leave this project in current stage as a pure high-performance graph storage, user has to write their own algorithms. Most importantly, client side traversal may cause performance issues on communications.

Currently Morpheus use message passing to establish BSP and any other traversal based computation models. To reduce the size of each message, Morpheus can distribute task parameters to all of the machines in the cluster. The current commit have already contains distributed depth first search for path search, construct sub-graph for vertex or bulk update. but I have't finished all of them yet because the condition to stop the search or update actions require the existence of query language in the system like Cypher in Neo4j, which I am working on to provide the flexible functionality in search and updates.

I haven't decided the format of the query language. Lisp-like expression (S-expression or Polish notation) is preferable because of it's easy implementation in parser. I will not simply eval the code due to security concern and performance (eval in Clojure will generate new class, it's life-cycle is not manageable and possibly leads to memory leak). The problem of S-expression is that there a few people get used to such expressions. People usually read 1 + 1 = 2, but an expression like (= (+ 1 1) 2) looks strange to them. Another advantage of s-expression is it is more succinct when 1 + 2 + 3 + 4 can be represented as (+ 1 2 3 4).

Morpheus also have a code distribute feature. Users can write their plain code, even pack as Leiningen project, send them to the cluster and run remotely without compile anything. It is useful if users want to run their own algorithms in the cluster. There should be a sort of API or framework, and I have done it yet.

Future plan includes a RDD based computation module for bulk data processing and map-reduce computation models, like GraphX in Apache Spark. It may consider vertices and edges as documents, users can join the documents with other sources. The computation model may not comply graph traversal models, but it is useful When relations are not the first class problems in the cases. The RDD module may have a dedicated memory management policy because the memory spaces required for computation are unpredictable. RDDs may require to temperately flush into disk instead of persist in the memory. For example, in LRU manner.

Computation models seems a much more challenging topic in this project. And I have so few time to finish the job.That is the reason I considered the Morpheus project as my second personal long-term project (the first the the WebFusion project, it was paused, will continue after the Morpheus project).

 

Servers are crashing, bad memory modules or hot weather?

I cannot figure out the reason, but recently, 2 of my servers both with 8 x 16GB of Kingstone DDR3 1600 MHz ECC REG memory give me the correctable ECC memory error messages. One of the server can recover by itself, another is totally dead with stucking at following screen on startup.

url_redirect

I used to resolve such problems by find and pop out the problematic modules. The server that cannot recover by itself is the server that I put it to the data center as a long term running server. I suffered 6 times of memory failure from different modules that was plugged into the same slot. Last year, I start to suspect there should be cooling problem, while there may have no air flow through the module and it gets overheat. The chassis for the server is a 1U with 4 hard drive. I invested a lot of resource on this machine, makes the small space crowded with 2 more SSDs for caching and 1 more shell less router for IPMI. Once I unplugged the memory module that cannot get any air flow, it was stabilized for a while (nearly 1 year), and got crashed today.

Another server as proving ground, computation server and virtual machine server in my home was observed that it has familiar problem when I testing Morpheus with wikidata graph on it. Instead of crashing, it can resolve the problem by itself but leave following messages in my server log

Apr 30 18:46:00 shisoft-vm kernel: [90407.431330] EDAC sbridge MC1: HANDLING MCE MEMORY ERROR
Apr 30 18:46:00 shisoft-vm kernel: [90407.431337] EDAC sbridge MC1: CPU 8: Machine Check Event: 0 Bank 5: 8c00004000010090
Apr 30 18:46:00 shisoft-vm kernel: [90407.431338] EDAC sbridge MC1: TSC 0
Apr 30 18:46:00 shisoft-vm kernel: [90407.431340] EDAC sbridge MC1: ADDR 1d7a258b00 EDAC sbridge MC1: MISC 204a167686
Apr 30 18:46:00 shisoft-vm kernel: [90407.431342] EDAC sbridge MC1: PROCESSOR 0:206d5 TIME 1462013160 SOCKET 1 APIC 20
Apr 30 18:46:00 shisoft-vm kernel: [90407.659365] EDAC MC1: 1 CE memory read error on CPU_SrcID#1_Ha#0_Chan#2_DIMM#0 (channel:2 slot:0 page:0x1d7a258 offset:0xb00 grain:32 syndrome:0x0 - area:DRAM err_code:0001:0090 socket:1 ha:0 channel_mask:4 rank:1)

If I unplugged the modules that was indicated as problematic, other modules fails in the next round of tests. That leaves me no other options but ignore it.

I have another machine with 2 x 16GB of Samsung 2133 MHz DDR4 REG ECC memory. Which was assembled in the beginning of last year does not have such problems even it's memory was exhausted and start to taking swap. I highly suspect the failures may been caused by the heat or maybe my hardware provider did not give me the qualified parts (motherboard may also cause such problems).

Right now, I decided to upgrade the machine with Samsung memory to 96GB, and one piece of Intel 750 400GB SSD as secondary storage for project Morpheus. I also planned to replace the machine in the data center with new one. My new server will take more care of head sink problems, hope it won't be so annoying in the future.

I don't suggest purchase hardware and place in the data center when cloud platforms (for example DigitalOcean and Amazon EC2) are affordable for their applications. My use cases are harsh, I have to customize my servers to balance performance and prices, and also have to manager server hardware problems by myself.

Nebuchadnezzar, finally evolved

After one month of desperate tests and adjustments (I nearly crashed one of the hard drive in my proving ground machine), I think I have found a reasonable design for the key value store.

The hard part of this project is memory management. My previous design is to allocate a large amount of memory for each trunk and append data cell to one append header. When a cell was deleted or obsoleted, there will be one defragmentation thread, moving all living cell to fill the fragments. That means if there is a fragment at the very beginning of the trunk, most of the cells in the trunk will be moved in one cycle and the fragment spaces only reclaims after one round of the defragmentation. Usually the fragment spaces reclaims so slow that the whole trunk cannot take updates operations too frequently, I used to apply slow mode in the system, but finally find it was a stupid idea.

I also tried  to fill the new cells into fragments. But it causes performance issues. Defragmentation process have to compete with other operation threads for fragments, there must be locks to keep consistent, that can heavily slow down the whole system.

After I wrote my previous blog in Celebrate Ubuntu Shanghai Hackathon, I nearly ditched the design I mentioned before and trying to implement partly log-structured design that was seen from RAMCloud. It took me only one night to do rewirte that part. I does not fully followed that design in this paper. Especially I does not append tombstones into the log. Instead, I write tombstones into the places that belongs the deleted cell, because the paper spent large amount of paragraph on about how to track tombstones and reclaim their spaces. And I found it has too much overhead. The major improvements compare to the original design is dividing one trunk into 8M segments. Each segment tracks their own dead and alive objects, and the maximum object size reduced from 2GB to 1MB. This design was proved to be efficient and manageable. Segmentation makes it possible to start multiply threads and defragment segments in one trunk in parallel. It significantly improved the performance on defragmentation. When a writer trying to acquire new space from trunks, it only need to search for one segment that has sufficient space. In most common situations, defragmentation process can always been finished on time.

I also redesigned the backup process. When trunks are divided into segments, synchronizing memory data into disks are much more straight forward. There is one thread for each trunk keep detecting dirty segments and send them to remote servers for backup once at a time. Dirty segment means the segments contains new cells. Segments with only tombstones will not considered as dirty and need to backup because the system tracks versions for each cell. In recover process, cells with lower version will be replaced by higher version, but higher one will not been replaced by lower one.

As in the application level, 1MB object size produce limitation. In project Morpheus, it is highly possible for one cell to exceed this limitation. Especially in edge list, one vertex may have large amount of edges in or out. But this limitation can be overcomed by implement linked list. Each edge list contains an extra field indicates next list cell id. When the list require to be iterated, we can follow the links to retrieve all of the edges.

Although, there is still some bugs in Nebuchadnezzar and I don't meant to recommend it to be used in production, but I glad to find that this project made some progress, instead of become another of my short term fever project.

Some thought about storage system for the in-memory key-value store

I was struggling for how to implement the durability feature for the key-value store. I have finished my first attempt of write memory data into stable storage. The paper from Microsoft did not give me much clue because it just mentioned that they designed a file system called TFS, which is similar with HDFS from Apache Hadoop. It did not give any more information about how and when to write memory data into the file system. My first and current design for memory management followed their implementation. After some tests by importing large amount of data into the store and then frequently update the inserted cells, I realised the original design from Trinity is problematic.

Memory management, also named defragmentation is used to reclaim spaces from deleted cells. Especially in update operations, the store will mark the original spaces as fragment and append new cell to the append header. If the spaces did not been reclaimed in time, the container will been filled and no more data can been written into. The approach in my previous presented article have major flaws in copying all of cells that follows the position of the fragment. That means if we mark the first cell in the trunk as fragment, in the defragmentation process, all the rest of the cells in the trunk will been copied. In some harsh environment,  the defragmentation process will never catch up.

The key-value store also supports durability. Each trunk will have a mirrored backup file in multiply remote servers to ensure high availability. Each operation on the trunk memory space, including defragmentation will mark the modified region of memory space in a skiped list buffer as dirty, order by the memory address offset to the operation. After certain interval of time, a daemon will flush all of the dirty data and their address to remote backup servers. Backup servers will take those data in a buffer and a daemon will take data from buffer and write then to disk asynchronously. With such defragmentation approach, each defragment operation will also leads to updating all of the copied cells. That is a huge io performance issue.

Then I read the paper from Stanford to find more feasible solution. The author provided full details on log-structured memory management. The main feature of this approach is to make memory space into append only segments. In cleaning process, the cleaner will make full empty segments for reuse. Each operation, including add, delete and updates are all appended logs. The system maintains a hash table to locate cells in segments. The cleaner is much complex than defragmentation in my key-value store, but it provides possibility to minimize  memory copy and parallel cleanning. The downside is the maximum size of each cell is limited by segment size by 1MB when my current approach can provide maximum 2GB for each cell (even it is not possible to have such big cells) and maintaining segments costs memory, especially when the memory space is large, the number of segments increases accordingly. I prefer to give the log-structured approach a try because defragmentation will become major bottleneck in current design, and large object is not essential in current use cases.

Manage memory is far more complex that I was thought in the first time. Other system like JVM and other programming languages took years on improving their GC, each improvements may reduce pause by 1 ~ 2x. I cannot tell which is the best solution without proper tests because each solution seems specified for certain use cases. I think I can do more research on this topic in the future.

Building a Graph Engine -- Key-Value Store -- Nested Schema

In this post I explained the reason of why it is necessary to define schema before insert any data into the key-value store. The previous solution did achieved the goal of compact arrangement. User can define a map schema in a DSL like this for a student record:

[[:id :long] [:name :text] [:school :text] [:enrollment :int-array]]

The a map data like this will fit in the schema

{:id 1, :name "Jack", :school "DHU", :enrollment [2010 2011 2012 2013]}

Most of the relational database which need to define schema or tables works like this, users can define a one-layer schema which it's data types are primitive or arrays at most. If we need to present a map like this:

{:id 1, :name "Jack", :school "DHU",
:scores {:developing 99
:data-structure 87
:math 60}}}

the 'score' field must been placed in another table and leave only a identifier in the student table.

But sometimes we need to embed additional information in one document rather than use identifier to link two items. For example, in Morpheus, we need to define an array which each items have a long and id type field for edge lists[1]. In this case we need to define schema in an nested manner. In the student record example, we can describe the schema with scores like this

[[:id :long] [:name :text] [:school :text]
[:scores [[:developing :short]
[:data-structure :short]
[:math :short]]]

This schema is just right for the student record with scores we seen. It looks like what MongoDB was designed for, just without storing key names and structure of each field in the map and their children. We can utilize this feature to save more spaces from repeatedly describe what is the data represents for in the memory.

The key-value also store supports array of nested schema and array in nested schema. It allows almost infinite combination, which should be flexible enough to model any data structure.

There are two parts of the actual implementation of this feature, the writer and the reader.

In the writer there is a planner responsible for taking data and schema, compile them into a flattened sequential instructions contains field data, data type (encoder) and data bytes length. Next, we just need to write field data according to those instructions one after another, without any gaps and tags.

Reader takes the location of the cell, find out the cell schema and reconstruct data from the schema. Both writer and reader require walk on the schema recursively.

There is a downside of this feature, which is it performance is lower than the one-layer schema. It is also harder to do optimization (Precompile. I will cover this later). But it did provide both memory efficiency and flexibility.

For more exact details, please read this.

Building a Graph Engine -- Modeling Graph: Vertexes, Edges and their relations

The graph engine was designed to build graph data models based on key-value schema, because the engine use identifier list in the vertex to make connections to other vertex or edges. It is efficient to fetch data according to the identifier of a key-value pair from a hash map than do any traditional search in an array. Every unit, including edges and vertex are store as key value pair in the memory store in a very compact way (see this article). In a data explore process, it should be deliver much more better performance than traditional relational databases. It is also flexible and able to model hyper graph, that one edge may have multiply vertex inbound and outbound.

For performance concern, how to present edges in vertex is an interesting topic. The problem is there may be many different kind on edges. For example, when we model a school roster, one student should have multiply kinds of relation to other matters like his class, his friends, his teachers, this scores. Usually we only need to know one kind of his relation like how many friends he has. If we put every edges in one list, it will be too much overhead to extract all of the edges that was linked with the vertex to find only one type of edges. One solution is to define the possible edge types in the schema like what we see how Trinity models movie and actor graph.Snip20160228_2But when the datasets increasing, one entity of a vertex may have more relations and edges to other vertexes over time, model edges like this will leads to alter the cell schema for the vertex frequently, which also leads to rebuild the whole data sets of the the schema in the store. I have studied neo4j and OrientDB, finally decided to develop my own approach, to store array map in vertex for each type of edges to a identifier to edge list cell. The edge list cell contains all of the edges of this type related to the vertex. A vertex contains 3 such array map for inbound, outbound and indirected edges. It can also contains more fields for other data that was defined in the vertex schema.
IMG_0053  Edge list cell has only one field, that is the list of the identifiers to other vertex or edges. For best performance, the engine allows both vertex id (for simple edges) or edge/hyper edges with additional document id for more complex models. The engine can determinate the type of the cell based on it's schema id from the store.IMG_0054_

After designed such structure for graph model, I found that the store still have limitations in custom types like array map. I need to dedicate more time on improving the key-value store, supporting nested schema will suit my needs.

Building a Graph Engine -- Key-Value Store -- Defragmentation

The key-value store to the graph engine did not use any native way in the programming language for memory efficiency concern. Instead, I use a constant size memory space to store all of the data.

In the beginning, it was good, when we add record to the space, the cells arranged tight and their location as well as their id was store in another HashMap as index. Then, if we delete any of the cells before the last one will make gaps between cells. Because I use append header to allocate new position for new cells, before reclaim those gaped spaces and reset the append header, the location of append header will continually increase and pop out of the store boundary.

Defragmentation seems a classical procedure in data base and operating systems. It is my first time to build such system through, I was first stuck in this part, but after some drawing the idea was pretty clear. I was able to finish this job in the Chinese new year eve (Yes, I write my code even when the others are celebrating and on vacation).
IMG_0049

The digram I drawn was shown above. There is a concurrent skip list map in trunks to record on fragments. When adding fragments, there is a check to detect adjacent fragments and merge them into one, like we seen in the 4th and 5th row in the digram. Defragmentation is to move data cell to the left nearest cell tail and mark new fragments  that the movements made just like the first fragment in the 2nd row. In my case, the move procedure is to copy original memory blocks and mark the new fragment for next turn in a loop. After one round of defragment loop, all of the fragments should been move and merged to tail of all data cells just like the last row in the digram. Then we can remove the last fragment and move the append header to the start position of the last fragment.

My implementation is about 50 lines of Clojure code. I also made tests and they passed for every circumstances I can imagine. You can check my code out from here.

Building a Graph Engine -- Key-Value Store -- Off-heap Trunks

I have already finish the memory storage part of the k-v store. Early version was based on java byte array. Then I started concerned about it's constancy between threads in heavy load. I put this question on the Stackoverflow, one employee from OpenHFT confirmed my concern and gave me a solution to resolve this issue.

Off-heap features from sun.misc.Unsafe is something every java developer should avoid because it will blow up applications if it was not been used correctly. It could bypass java memory management including garbage collection to allocate up to TBs of spaces in memory directly like what we usually did in C programming language. It can also avoid thread cache and memory fence, which could accelerate the program and also produce the thread  inconstancy.

I once considered use off-heap before I tried the byte array, but I was worried about that I might not be able to handle those low level operations, and there is no official documents for unsafe because it is an internal API. I realised that the inconstancy is a huge impact on performance because I have to use read write lock or make every write operations in on thread. The byte array also have problem on it's maximum size of about 2G bytes, I have to make multiply trunks to contain more data than that.

Taking over full control of memory management seemed crucial to this project. I was never bothered to try new things to make my works more powerful. After further research on off-heap and helps from Peter Lawrey, I spent about one hour to transform my original approach to off-heap. Most of the primary data types in unsafe have native implementations, which made my work much easier. Tests indicated the performance impact are almost identical compare to the byte array approach. I also meet some JVM crashes during debugging defragmentation because I forgot to add the base address to the offset when performing copy memory, except that, everything is holding tight.

Although it would be fine if the unsafe was used properly, but it will need additional safeguard like preventing write out of the bound to avoid crashing the whole JVM when the pure java approach may just throw an exception instead.

Building a Graph Engine -- Key-Value Store

Graph access pattern are mostly random. To achieve best performance that was possible, it is wise to put everything in memory rather than disk-oriented store. Because DRAMs is much more faster and expensive than hard drives and SSDs, storage efficiency is the essence of the matter. One of my colleges from my previous employer used to developed his own graph database as well. He told me that his design hits memory limitation quickly which has low memory efficiency. His system was written in C++, the relation between nodes was represented in pointer and each node is an object. I cannot get more details on his implementation, but represent node and relations in native form like objects in object-oriented programming languages and maps is bad use case for efficiency, although it is easy for development. Especially for java, each allocated object took 12 bytes for header and additional bytes for alignment[1]. If the object contains other objects, the memory overhead consumption stacks by the number of total objects allocated, which is not acceptable for in-memory applications.

Backend store for Morpheus is a in-memory distributed key-value store. It took some ideas from RAMCloud. Each instance contains numerous of trunks, each trunk contains one constant size of byte array, all of the data record need to be serialize to write and deserialize to read on the byte array for all the time. There is also a hash map as index for record id to the location of each record in the byte array in each trunk. Because the location of the record may change due to defragmentation, value in the index hash map may also change correspondly.

Trinity and most of the relational database require to define schema before insert new data record. It may not flexible compare to the modern database systems like MongoDB and CouchBase, but it is possible to deliver much more compact storage without wasting precious spaces on describing data structure for each records. Morpheus took the same idea for it's backend store, user need to define schema with name and data type for each fields. Fields need to in a sequence for each schema, the system will write fields data according to the schema leaves no gaps and any overhead on data types or names to the fields. The system can also reconstruct the record from bytes according to schemas.

Users may delete or update records in the store. There are certain circumstances that may leaves gaps or we called fragment in the bytes array:

  • Delete: It is obvious that deleting records will erase the sections that the record took in the byte array. That will leave a gap start with the location of the record and it's size is the same as the record took. In this case, the system will mark the whole space as fragment.
  • Update: There are three possible circumstances of updating a record in the trunk. The most easy one is the replacement has the same size of the original one, the system only need to replace the bytes with the new serialized record. Another is the replacement needs less space than the original. Then the system can still write the data to the original place but the rest spaces need to be record in the trunk as fragment. When the new record took more space than the original one, the system will write the new record at the append head and update the index like inserting other new records and mark the whole section of the original one as fragment just like delete the original record.

The backend store took care of both of the circumstances above which user can use it as a fully functional key-value store.

When the system was running for a long time, frequent delete and update operation may leave lots of fragments in the trunk, which leaves less spaces for new records to store. A defragmentation mechanism is required to resolve this issue by moving record sections to fill in the gaps and keep data integrity. After on turn of defragmentation, the append head should move backward and more records can fit in. Implementing non-pause defragmentation is difficult, I will address this issue later in another article.

Each record has a integer id, the backend store distributes the record to servers and trunks in servers according to the id and fetch the record by the id. I can still use distributed hashing table for this case, which is easy to accomplish shard-nothing architecture. I have already done this part in another of my project.

 

Building a Graph Engine -- Introduction

Recently, I am trying to build my own graph database and graph compute engine system to support my research. The purpose of the system is to provide a distributed, low-latency store with computational engine based on RDDs. I named it "Morpheus", saluting another graph engine from Microsoft named "Trinity", which I took it's design concepts from this article and implemented in my system.

The system has three parts: a distributed in-memory key-value store, graph represent layer providing graph w/r operation functionality based on the k-v store and distributed data processing framework.

Right now, this project is under heavy development and I think it will took pretty much time until all of the parts are ready for missions. You can track the progress by visiting my GitHub profile. I will release some articles to reveal the details on the implementations. I have so much ideas in this moment and I need to test to make the right decision but I will provide reports in these articles and explain why I adapt those solutions.