Tuesday, April 21, 2009

Index Partitioning and Distributed Realtime Search

When the number of documents gets to be very large in one single index, it is usually a good idea to partition the index and distribute the search load to different processes or search nodes, each serving from a section of the corpus.


Some Background:

There are many different ways of partitioning the index depending on the application requirements, here are some examples.
  • By terms - This is good because not every search node is handling every request. Only the search nodes containing terms from the query are called to serve the request. However, normally some terms are searched more than others, and on top of that, some terms contain more documents than others. Therefore it is usually very difficult to load balance this partitioning scheme.
  • By documents - This is the most common partitioning method. Even though every search node is handling every search request, (assuming documents are uniformly distributed across the partitions,) it is very easy to balance and re-balance search traffic.
  • By time - This is a good idea in corpus containing time sensitive data, e.g. job postings, news, blogs etc. In such cases, recent documents are by definition more relevant than older documents. Thus, we can add more replication on partitions with recent documents while slowly discard older partitions from the index.

Our Partitioning Scheme:



We, at LinkedIn, are in a real-time search situation while guaranteeing high availability, we want to have a partitioning scheme that would allows us to avoid re-indexing/re-partitioning as our corpus grows.

We have decided to choose partition by ranges of documents. We choose the maximum number of documents in a partition, say N. And partition along consecutive document IDs (UIDs, not Lucene docids), .e.g. partition K, would containing documents ranging from K*N to (K+1)*N-1, K = 0,1,2,...

As new documents are added to the index, we grow the partition by adding to the largest partition, and when the UID exceeds the maximum document ID on the largest partition, a new partition is created. This partitioning scheme allows us to grow to newer partitions without a "re-partition".


Cross domain "joins":

Another important benefit we enjoy with this partitioning scheme is that we can load Lucene docid to UID mapping (see my previous post) as well as the reverse: UID mapping to Lucene docid mapping, because we know the range UIDs fall in. We are thus very easily able to translate a filter set in the UID space to a Lucene Filter or a Lucene Query. This is important because it essentially allows us to do "joins" across different document spaces. The memory footprint for holding both lookup arrays are docCount*4 bytes each, nowadays, it's a piece of cake.


Realtime at the Node-Level:

To guarantee realtime on the search node level, each of are partitions are powered by Zoie. See my earlier post)

Sunday, April 12, 2009

Lucene docid,UID mapping and Payload

Lucene docids are internal integers representing documents in the index, and Lucene takes liberty in reassign docids to during segment merges and when expunging deleted documents. In general one needs to be very careful with dealing with docids when they are referenced in the application context because the document a docid refers to is expected to change at anytime.

In many applications, it is very useful to have a quick way to map a Lucene docid to an external UID, e.g. a primary key in a database. For example, result filtering depending on an external document set. In the case of LinkedIn people search, filtering by a person's network on a text search result set, UID in this case being the member id. We needed a mechanism to map from a Lucene docid in a HitCollector.collect() very quickly to the corresponding UID.

A common and naive practice is to keep the UID in a Stored field, e.g.

Field("uid",myUID,Store.YES,Index.No);

and retrieve the uid via:

int uid = Integer.parseInt(indexReader.document(docid)
.get("uid"));

When recall (the number of hit candidates) is large, this method is streaming from the stored file for each hit, and thus does not perform.

A better alternative is to use the FieldCache to load into an integer array a mapping from docid to uid for each docid in the index ,e.g. 0 to indexReader.maxDoc()
(assuming uid is kept in the indexed field) e.g.

Field("uid",myUID,Store.NO,
Index.NOT_ANALYZED_NO_NORMS);

When IndexReader loads, we load the "map array":

int[] uidArray = FieldCache.DEFAULT.getInts(indexReader,"uid");

This is done once for a new IndexReader, and at search time, the mapping is just an array lookup:

int uid = uidArray[docid];

which is rather fast.

The problem here is that the number of terms in the uid field is equal to or very close to the number of documents in the index. To load from the FieldCache, a random seek is done for each term, which makes loading the uidArray extremely slow. This is fine if IndexReader does not load very often, but unfortunately for us, in a realtime search system, we don't have the luxury of such assumption. (This also, as a side effect, increases the total number of terms in the index and may impact search performance)

Lucky for us, Lucene 2.2 introduced Payloads, (a huge step towards flexible indexing), which is the ability to allow arbitrary data to be added to the posting list. In our case, we would create an arbitrary term for every document, e.g. Term("uid","_UID_"), and attach a 4-byte uid value to each posting:

class UIDTokenStream extends TokenStream{
private Token token = new Token("_UID_",0,0);
private byte[] buffer = new byte[4];
private boolean returnToken = false;

void setUID(int uid){
buffer[0] = (byte)uid;
buffer[1] = (byte)(uid>>8);
buffer[2] = (byte)(uid>>16);
buffer[3] = (byte)(uid>>24);
token.setPayload(new Payload(buffer));
returnToken = true;
}

public Token next() throws IOException{
if (returnToken){ returnToken = false; return token; }
else { return null; }
}
}

UIDTokenStream tokenStream = new UIDTokenStream();
tokenStream.setUID(myUID);
Field("uid",tokenStream);

When we load the uidArray, we do:

TermPositions tp = null;
byte[] dataBuffer = new byte[4];
int[] uidArray = new int[indexReader.maxDoc()];
try{
int idx = 0;
tp = indexReader.termPositions(new Term("uid","_UID_"));
while(tp.next()){
int doc = tp.doc();
tp.nextPosition();
tp.getPayload(dataBuffer,0);
// convert buffer to int
int uid = ((dataBuffer[3]& 0xFF) <<24)>

uidArray[idx++]=uid;
}
}
finally{
if (tp!=null){
tp.close();
}
}

Looking up the uid is same as before, simply an array lookup:

int uid = uidArray[docid];

The difference here is when loading the uidArray, sequential seek is done for each docid while paying the penalty of byte[] -> int conversion. (Also to a previous point, this method introduces only 1 extra term to the entire index)

We ran some performance numbers on a 2M-document index, loading from the FieldCache took more than 16.5 seconds, while loading the uidArray from payload took 430 ms, this is an improvement of over 38x! (this time was taken a while ago from my MacBook Pro using Lucene 2.2)

This mechanism is built into Zoie Realtime Search and Indexing System used to filter out duplicate documents between in-memory indexes and the disk index. (Since they are independent indexes, only consistent id to filter from is the UID)

Tuesday, April 7, 2009

Zoie - a realtime search and indexing system

A realtime search system makes it possible for queries to find a new document immediately or almost immediately after it has been updated. With Lucene's incremental update functionality, we were able to extend Lucene to support realtime indexing/search.

We open-sourced this technology and called it Zoie: http://code.google.com/p/zoie.

Our design uses multiple indexes; one main index on disk, two additional indexes in memory to handle transient indexing events.

The disk index will grow and be rather large, therefore indexing updates on disk will be performed in batches. Processing updates in batches, allows us to merge updates on the same document to reduce redundant updates. Moreover, the disk index would not be fragmented as the indexer is not thrashed by a large number of small indexing calls/requests. We keep a shared disk-based IndexReader to serve search request. Once batch indexing is performed, we build/load a new IndexReader and then publish the new shared IndexReader. The cost of building/loading the IndexReader is thus hidden from the cost of search.

To ensure realtime behavior, we have two helper memory indexes (MemA and MemB) that alternate in their roles. One index, say MemA, accumulates events and directly serves realtime results. When a flush/commit event occurs, MemA stops receiving new events, these are sent to MemB. At this time search requests are served from MemA, MemB and the disk index. Once the disk merge is completed, MemA is cleared and MemB takes its place. Untill the next flush/commit searches will be served from the new disk index and MemB.

For each search request, we open and load a new IndexReader from each of the memory indexes, and along with the shared disk IndexReader, we return a MultiIndexReader built from those three IndexReaders.

Zoie has been running in production at http://www.linkedin.com, in distributed mode, it is handling almost 40 million documents (or user profiles) in realtime and serving over 6 million requests a day with an average latency below 50ms.

We welcome contributions in any form to move the project forward.