Building a Graph Engine -- Key-Value Store -- Off-heap Trunks

I have already finish the memory storage part of the k-v store. Early version was based on java byte array. Then I started concerned about it's constancy between threads in heavy load. I put this question on the Stackoverflow, one employee from OpenHFT confirmed my concern and gave me a solution to resolve this issue.

Off-heap features from sun.misc.Unsafe is something every java developer should avoid because it will blow up applications if it was not been used correctly. It could bypass java memory management including garbage collection to allocate up to TBs of spaces in memory directly like what we usually did in C programming language. It can also avoid thread cache and memory fence, which could accelerate the program and also produce the thread  inconstancy.

I once considered use off-heap before I tried the byte array, but I was worried about that I might not be able to handle those low level operations, and there is no official documents for unsafe because it is an internal API. I realised that the inconstancy is a huge impact on performance because I have to use read write lock or make every write operations in on thread. The byte array also have problem on it's maximum size of about 2G bytes, I have to make multiply trunks to contain more data than that.

Taking over full control of memory management seemed crucial to this project. I was never bothered to try new things to make my works more powerful. After further research on off-heap and helps from Peter Lawrey, I spent about one hour to transform my original approach to off-heap. Most of the primary data types in unsafe have native implementations, which made my work much easier. Tests indicated the performance impact are almost identical compare to the byte array approach. I also meet some JVM crashes during debugging defragmentation because I forgot to add the base address to the offset when performing copy memory, except that, everything is holding tight.

Although it would be fine if the unsafe was used properly, but it will need additional safeguard like preventing write out of the bound to avoid crashing the whole JVM when the pure java approach may just throw an exception instead.

Building a Graph Engine -- Key-Value Store

Graph access pattern are mostly random. To achieve best performance that was possible, it is wise to put everything in memory rather than disk-oriented store. Because DRAMs is much more faster and expensive than hard drives and SSDs, storage efficiency is the essence of the matter. One of my colleges from my previous employer used to developed his own graph database as well. He told me that his design hits memory limitation quickly which has low memory efficiency. His system was written in C++, the relation between nodes was represented in pointer and each node is an object. I cannot get more details on his implementation, but represent node and relations in native form like objects in object-oriented programming languages and maps is bad use case for efficiency, although it is easy for development. Especially for java, each allocated object took 12 bytes for header and additional bytes for alignment[1]. If the object contains other objects, the memory overhead consumption stacks by the number of total objects allocated, which is not acceptable for in-memory applications.

Backend store for Morpheus is a in-memory distributed key-value store. It took some ideas from RAMCloud. Each instance contains numerous of trunks, each trunk contains one constant size of byte array, all of the data record need to be serialize to write and deserialize to read on the byte array for all the time. There is also a hash map as index for record id to the location of each record in the byte array in each trunk. Because the location of the record may change due to defragmentation, value in the index hash map may also change correspondly.

Trinity and most of the relational database require to define schema before insert new data record. It may not flexible compare to the modern database systems like MongoDB and CouchBase, but it is possible to deliver much more compact storage without wasting precious spaces on describing data structure for each records. Morpheus took the same idea for it's backend store, user need to define schema with name and data type for each fields. Fields need to in a sequence for each schema, the system will write fields data according to the schema leaves no gaps and any overhead on data types or names to the fields. The system can also reconstruct the record from bytes according to schemas.

Users may delete or update records in the store. There are certain circumstances that may leaves gaps or we called fragment in the bytes array:

  • Delete: It is obvious that deleting records will erase the sections that the record took in the byte array. That will leave a gap start with the location of the record and it's size is the same as the record took. In this case, the system will mark the whole space as fragment.
  • Update: There are three possible circumstances of updating a record in the trunk. The most easy one is the replacement has the same size of the original one, the system only need to replace the bytes with the new serialized record. Another is the replacement needs less space than the original. Then the system can still write the data to the original place but the rest spaces need to be record in the trunk as fragment. When the new record took more space than the original one, the system will write the new record at the append head and update the index like inserting other new records and mark the whole section of the original one as fragment just like delete the original record.

The backend store took care of both of the circumstances above which user can use it as a fully functional key-value store.

When the system was running for a long time, frequent delete and update operation may leave lots of fragments in the trunk, which leaves less spaces for new records to store. A defragmentation mechanism is required to resolve this issue by moving record sections to fill in the gaps and keep data integrity. After on turn of defragmentation, the append head should move backward and more records can fit in. Implementing non-pause defragmentation is difficult, I will address this issue later in another article.

Each record has a integer id, the backend store distributes the record to servers and trunks in servers according to the id and fetch the record by the id. I can still use distributed hashing table for this case, which is easy to accomplish shard-nothing architecture. I have already done this part in another of my project.

 

Building a Graph Engine -- Introduction

Recently, I am trying to build my own graph database and graph compute engine system to support my research. The purpose of the system is to provide a distributed, low-latency store with computational engine based on RDDs. I named it "Morpheus", saluting another graph engine from Microsoft named "Trinity", which I took it's design concepts from this article and implemented in my system.

The system has three parts: a distributed in-memory key-value store, graph represent layer providing graph w/r operation functionality based on the k-v store and distributed data processing framework.

Right now, this project is under heavy development and I think it will took pretty much time until all of the parts are ready for missions. You can track the progress by visiting my GitHub profile. I will release some articles to reveal the details on the implementations. I have so much ideas in this moment and I need to test to make the right decision but I will provide reports in these articles and explain why I adapt those solutions.

The Demo is ready

I am thrilled to announce the first tech demo of the new WebFusion (codenamed odin) is completed.

The address of the demo is: http://www.shisoft.net/

This demo exhibits the function of recommend feeds (from about 1,000 twitter accounts I followed in this demo) by user selected categories. This function is an imitation of mobile app Zite, which have been acquired by Flipboard[1]. You can simply type your thoughts in the text box, the system will give you hints automatically.

Snip20160108_1

You can then use your mouse or keyboard to select the item. If you prefer to let the system suggest categories combined feeds, feel free to select more. Then, click the search button to submit your query.

Snip20151222_30

Your recommendation of feeds should be displayed in no time. In the image above, you can see four feeds from twitter. You can also scroll the page down to get more until there is no more feeds in the database that related to your selected categories. In your attempts, the results should vary because the system is receiving and processing new feeds published by other persons in real time, you can always see the newest feeds once it arrived in the system.

This system also supports recommend feeds by user inputs that categories does not exists in my database (updated at 8 January 2016). Every internal categories was prepended with symbol "♦", indicates that this category has comprehensive features in my database that the system can provide accurate recommendations. A category that does not prepended with the symbol indicates that the system will use features according to the words user inputed to provide recommendations which it may not very accurate compare to internal categories.

You can switch categories by input new names in the text box, or you can simply click on the links below the feeds that indicated as "Related Categories".

This is NOT a search engine. The system considers both keywords and the meaning of the contents to categories, including probability distribution on topics.

This function is still experimental. After filtering, there are over 100,000 categories that the user can select for recommendations. The data training processes are all unsupervised, it is impossible for me to check the accuracy for all of them. As far as I know, there are some flaws right now which makes it imperfect. There are a lot of overlapping categories that have almost the same features. Their results of recommendations are identical. It can distinguish Apple computer and apple on trees, but it cannot tell the difference from "Google" and "Microsoft". Because their topics are the same and their keywords are almost identical. Due to all of the categories and its data comes from Wikipedia dumps, the quality cannot be controlled, some categories are also overfitted by more popular subcategories.

This function is a part of the new WebFusion. In the demo version, "Share" and "Reply" button are not functional right now, they only use as placeholder.

The dedicated server for this demo is in Shanghai right now because this system is huge and I cannot afford virtual servers in some cloud provider like DigitalOcean. For the concern of the national great firewall, U.S users may suffer time out exceptions or service temporally unavailable. I am trying to overcome these problems. If you have any difficulties on reviewing this demo, please don't be hesitated to contact me.

* Right now I have made my best efforts on improving the experience in your visiting this demo by redirecting traffic through 3 more servers. The outbound server is in San Francisco with this blog site, the origin server is still in Shanghai.

* I have already use shadowsocks to replace ssh for internal proxy service to twitter and other banned feed source in China. But the route to my shadowsocks server is also not stable. I tried to solve this issue by using HaProxy to load-balance and failover 9 different routes to the shadowsocks server in U.S, it works.

* It have been a long time after I first start to collect twitter feeds and compute their features. I adjusted the parameters which slightly more strict so that the recommendations for categories should be more accurate.

* You can get more technical details from here

Real-time topic-based social network feeds recommending system

Hao Shi

2015-04-18

PDF File

Real-time topic-based social network feeds recommending system

The modern social networks are organised by authors. People connect with each other by sending friend requests or following before they share information. Reader cannot get information by interests. There are some apps like Zite allows user to define their interested topics and offers articles from multiply blogs and news feeds. But there is no such recommending system for short feeds like tweets or Facebook messages.

We built a recommending system for users to browse newest social networks feeds by their interests. Codenamed ‘Minerva’. The goal of this system is:

  • Let users choose their interests in natural language, like ‘NASA’ and ‘Software’. The system will give out all of the newest feeds from multiply sources like Facebook / Twitter / Tumblr / Weibo / RSS related to user’s choice and order the feeds by publish time.
  • Label feeds to its categories.

Snip20151214_13

Fig.1. Example of categories label for feed

The box indicates the categories for the feed. Users can continue explore feeds by related categories.

This article will summarize the high level implementation of our system.

We use raw data to train low-level models like inversed term frequency (IDF) of words and LDA words number of topics (NWZ). Next we compute words and topic distribution as features for categories in Wikipedia. Each category has a name in natural language.

We need corpus from public domains to make sense of categories. Wikipedia holds more than 4,000,000 articles and 400,000 valid categories. Human volunteer contributes and its quality meets our needs. It’s ideal for this research.

In the online production scenario, when our data sources sent new feeds from social networks, we will compute its features like Wikipedia categories as well, save and index features in database for query. When a user would like to read feeds for one or more category, we can get features for categories and query just inside the database system to retrieve feeds related in the categories that users have chosen. To label feeds, use feed features to search for the top n categories by features similarity.

The data flow of training low-level features and categories features, described in Figure 2.  We get feeds data from WebFusion, Wikipedia dumps form Wikipedia official web site. Dumps were parsed into database. To transform the raw data into corpus, we segmented text and represent words as numbers with formats stripped. Combined the entire corpus as training set for low-level models.

Because of the humongous amounts of data for a single worker (about 22 GB for Wikipedia and 7 GB for feeds raw data), we performed computations in parallel on a cluster with 172 VCPUs and 224 GB of RAM. We store raw data in MongoDB, result data in PostgreSQL and intermediate data in HDFS. Data processing framework is Hadoop Map Reduce.

First we need to compute the inversed document frequency for each word. This will be use as a feature for words and to reduce document size for LDA inference. Next, we compute NWZ. This is one of the major steps for topic-model. We run Gibbs sampling for 1024 topics and 100 iterations. NWZ will use to compute the topic probability distribution for texts.

After low-level models prepared, we can work on the most important part of this system. To extract features for each category from Wikipedia, we need to collect categories and connect its parent categories. Wikipedia category hierarchy is a graph. Categories should have features from its subcategories. To avoid cyclic dependencies and focus feature factors, we limited the hops. Then we get a list for each article with its categories and neighborhood categories. Map articles words to each item in the category list. Reduce words for each category to get its bag of words. The size of bag of words may be too large for some categories. We use IDF to compute TD-IDF for each word in the category, take top 2000 words as the new compact bag of words. Category features contain topic distribution and words TF-IDF. We can get all of them according to the word of bags. The two features are all sparse vectors, we can represent their factors as maps. To store the features into database for retrieve, we use extension for PostgreSQL named HStore to store and index map data.

That’s all we need to prepare offline. When the system is online, feeds streaming to the system from data sources. Minerva needs to extract features from feeds like training sets for words IF-IDF and topic distribution, store them into the database with feeds identifier. The content of feeds will not store with features, it was saved in the WebFusion database. Minerva uses identifier to fetch contents for users.

When a user request to query feeds for a category or compound categories, Minerva will use category features to find similar feeds in direct in the database. We built a classifier in the database engine to predict whether feed it is belong to the category or not. First, the database will find the rows that contain feature factors of the category. Then compute score according to hypothesis function for each feed. Set a Threshold for the output as the result. In-depth, we can use logistic regression to train this model for a better result. In our experiment, we use a simpler hypothesis function to get feed score like

Snip20151214_14

D is feed featured topic distribution and W is TF-IDF for words. t the threshold, if h(x)> t, the feed is belong to this topic. Because the amount of candidate categories for each feed may be large, we don’t determinate the category list beforehand. Instead, we predict their categories when user asked to. For performance concern, Minerva runs this algorithm in database. With the help of indexes on maps, database doesn’t need to scan for all of the records to get their h(x).

User can also get categories list for feeds in reverse. We can use feed features to compute h(x) for categories and get top n in descending order.

Snip20151214_15

Fig. 2. The data flow of training procedure.

Snip20151214_16

Fig. 3. The constitution of bag of words for categories that have subcategories.

 

* Thanks Yuhao Zhu @ USC for viewing draft version of this article

External scripts work with clojurescripts

A important concept in optimising web site is to reduce the number of files and compress them. Lein-cljsbuild for Clojurescript is doing a great job. It combines various cljs files together and compile them to one js file together with removing unnecessary codes and whitespaces. But when there are any external js in the project files, how to manage them is an issue.

This article provides some approach to do so. I also have already doing research on this for some time. And I think I need to take some note for record.

I was highly expected the :extern approach. I was considered this approach as some way to attach the contents of the js file to the compiled js file for cljs. But I cannot find anything form the extern js file in the compiled file. But the compiler throws warning for the extern js file. I am not very sure what did it do.

I also tried the :foreign-libs approach but still get nothing after compilation. I think I was misunderstood the manual, but I don't which part.

The final approach to resolve this issue is to attach <script> tag in to the webpage after all of the compiled js file are loaded.

Transparent and stable proxy based on shadowsocks

I spent a day to do some nasty work that bother to do. The linkage in our country to others goes worse day by day due to security issue. I cannot stand enduring this time-wasting problem. So, I decided to solve it once for all.

I have heard shadowsocks for some time, but didn't use it because it requires special program on both server and client. My friends told me it is much more stable than VPN of any kind, it's also fast and can handle thousands of connections in the same time. So, I decided to take a shot.

To my surprise, the installation is easy, like other third-party packages on server, I finished it in 5 minutes. The configuration on my mac is also simple. Download the ShadowsocksX and set the server address, port, password and encryption methods is all of the job. ShadowsocksX is a PAC, it will set the address in your system after startup. Programs on your mac may use PAC as proxy server. PAC contains a list of address for blocked url patterns to go through your shadowsocks server. It is useful in most of the conditions, but I wonder is there any way to redirect all of the traffic in my home to the proxy server if necessary.

What I meant the traffic in my home is that I want every machine, including my iPhone, iPad, Mac, servers, laptops and much more share one shadowsocks server without any configuration on the devices. The only way to achieve this goal is to do the job on the router.

After did some research with Google, I found a C programming language port of shadowsocks named shadowsocks-libev contains one component designed especially for this use. This port also contains server component. So I removed the original python one and redeployed the C port.

The basic concept of what I will do on my router is to use iptables to redirect every traffic from foreign countries to the shadowsocks proxy as client of my shadowsocks server in the US.

The proxy in the router to connect to the shadowsocks server in the US is a program named ss-redir. It use the same configuration contents of the server, and you need to pay attention to the "local_port" because it will be used later in the iptables configurations. After setting up the parameters and start the redir correctly. We can redirect the traffics from all of the devices in the network to proxy. But we don't want all of the traffic go through the proxy because all of the server in China are not affected by the national firewall. So, we also need to add exceptions to the iptables.

This manual is pretty clear about the whole procedure, I use this exceptions below to diverge uplink from Asia and other places.

# Ignore LANs IP address
iptables -t nat -A SHADOWSOCKS -d 0.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 10.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 127.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 169.254.0.0/16 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 172.16.0.0/12 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 192.168.0.0/16 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 224.0.0.0/4 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 240.0.0.0/4 -j RETURN

# Ignore Asia IP address
iptables -t nat -A SHADOWSOCKS -d 1.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 14.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 27.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 36.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 39.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 42.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 49.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 58.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 59.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 60.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 61.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 101.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 103.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 106.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 110.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 111.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 112.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 113.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 114.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 115.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 116.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 117.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 118.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 119.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 120.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 121.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 122.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 123.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 124.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 125.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 126.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 169.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 175.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 180.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 182.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 183.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 202.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 203.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 210.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 211.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 218.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 219.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 220.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 221.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 222.0.0.0/8 -j RETURN
iptables -t nat -A SHADOWSOCKS -d 223.0.0.0/8 -j RETURN

Now I can enjoy the true internet. Although it is not fast enough, but acceptable for basic use. My VPS may use a lot more traffic after this, hope the quota wont exceed.

Snip20150730_3

Hewlett-Packard Hackathon 2015 Shanghai Station

After discovered myself controlled the situation of TOEFL studying. I signed into the Hackathon as a hacker to meet some friends.

The activity was held by Hewlett-Packard in a luxury hotel. Food and drinks there are really great. Most of all, I can meet some people there to get to know their abilities and life-style.

I choose distributed computing system as the subject and gathered some 2 teammates. As the leader of the team, I did all of the development. This was 24 hours without a sleep, but I felt quite energized.

One of my team member is a leader of ACM campus team in their school, he seeks for engineering  experience. I taught him some basic concepts of functional programming, lisp and clojure. He seems interested but did helped my development.IMG_2122

Another team member is a male high school student with long hair and a girly nick name. He helped me on the keynote animation.

Mostly I do the coding job. When I want to take some rest, I walk around and joined some topic about their school life, works, rumours, weapons, drones, machine learning, and much more. I just hear their conversation and watch their works.

Most of the team spent a lot of time on keynotes. But I spent my time on development. The development is a success, I achieved the goal of my primary design. But I lack of time and energy on speech and explanation. Finally I loose the game. The team of first prize made a fantastic keynote and humor speech, but their code-works are not the same thing of their design. Others did not do much coding either. I felt a bit disappointed about this.

The basic concepts of designing the new WebFusion

After one year of pause on developing WebFusion. I can look back and pick up some defects on the original design. When the defects in the design accumulated to an unacceptable rate, I decide to start it over with new one.

New WebFusion project codenamed odin, will use Clojure as the main programming language on server side. In the nearly one year work day with Clojure, I found its pretty and elegant syntax, functional programming can boost the development with clear expressions. Another reason I choose Clojure rather than other Lisp or FP languages is because I can make use of the code bases from old WebFusion when I want to. Another benefits I can take from JVM based programming language is that I can use tones of library that proven stable, high-performance and well documented.

Odin is fully distributed and event driven. That means the each components of odin can run on different servers in any number of processes and working threads. They use Remote Function Invocation from a project that I was created in my employed work time named cluster-connector to communicate with each other. The inner-interactive is event driven, which means odin will not open new thread pool for new users, it will dispatch tasks in one thread to make full use of it and with concurrency benefits. It works like a reactor to dispatch messages by using a project that based on Project Reactor named meltdown for asynchronous.

Odin also benefits from Clojure by using some language features like macro and multi-methods. Macro can make the compiler generate codes from your code, Which means you direct the compiler to do some heavy job to generate repeatedly code or your own DSL to replace complex syntax that should write in native form. And you can even generate macros from macros make works easier for humans. Multi-method is a feature that reflects object-oriented programming, but in a flexible way. It use a dispatch function to determinate the pathway of reaching the actual function rather than fixed class hierarchy. It dispatch to methods according to its parameter, allows me to configure the behavior for each analogous modules in files.

All of the design was tested separately, I am trying to put them together to make sure the system works as expected. The results are not sure, I will report in another article in this blog when it is done.