Sunday, November 15, 2009

Numeric Range Queries - A comparison

Problem:

RangeQueries have long been problematic for Lucene. Internally, it constructs an OR query of TermQueries, each correspond to an entry in the term table with value that fall within the specified range.

When the range covers many terms, this approach has a term upper bound of BooleanQuery.getMaxClauseCount() (or a TooManyClauses runtime exception will be thrown)
And it can be really slow!

Solutions:

This blog examines the available alternatives and provides some performances analysis. (Without loss of generality, we will look at range query handling only on integer values, though the approaches to be discussed support all numeric types)

NumericRangeQuery:
As part of the recent Lucene 2.9.x release, Uwe Schindler introduced NumericRangeQuery which aims to solve this problem. (good documentation in the javadoc: http://lucene.apache.org/java/2_9_1/api/core/index.html) I will not do this any injustice by trying to explain details of this algorithm.

FieldCacheQuery:
There is however, another approach by using the FieldCache:

1) obtain the int[] from FieldCache.getInts()
2) iterate thru each element and collect it as a hit if it falls within the specified range.

Code snippet:

static Query buildFieldCacheQuery(final int start,final int end){

Filter f = new Filter(){

@Override

public DocIdSet getDocIdSet(IndexReader reader) throws IOException {

final int[] data = FieldCache.DEFAULT.getInts(reader, FIELDCACHE_FIELD);

return new DocIdSet(){


@Override

public DocIdSetIterator iterator() throws IOException {

return new DocIdSetIterator() {

int docid=-1;

@Override

public int advance(int target) throws IOException {

docid=target-1;

return nextDoc();

}


@Override

public int docID() {

return docid;

}


@Override

public int nextDoc() throws IOException {

int val;

docid++;

while(docidlength){

val = data[docid];

if (val>start && val

return docid;

else docid++;

}

return DocIdSetIterator.NO_MORE_DOCS;

}

};

}

};

}

};

return new ConstantScoreQuery(f);

}


Comparison:

Index structure:

NumericField numericField = new NumericField(NUMERIC_FIELD, 4);

numericField.setIntValue(n);

doc.add(numericField);


Field fieldCacheField = new Field(FIELDCACHE_FIELD,String.valueOf(n),Store.NO,Index.NOT_ANALYZED_NO_NORMS);

fieldCacheField.setOmitTermFreqAndPositions(true);

doc.add(fieldCacheField);


Field rangeField = new Field(RANGE_FIELD,format.format(n),Store.NO,Index.NOT_ANALYZED_NO_NORMS);

rangeField.setOmitTermFreqAndPositions(true);

doc.add(rangeField);


Following are the results:


JVM settings: -server -Xms1g -Xmx1g

The test measures different range lengths with respect to the possible values in the index, e.g. Range 1% would correspond to the range [0 - 10000] on a index size 1M docs

Index Size - 1M docs:

Range %:
RangeQueryNumericRangeQueryFieldCacheRangeQuery
1%202 ms1 ms1 ms
5%2047 ms3 ms2 ms
20%NA9 ms5 ms
50%NA17 ms9 ms
100%NA26 ms9 ms


Index Size - 5M docs, No longer measuring RangeQuery to stop beating the dead horse

Range %:
NumericRangeQueryFieldCacheRangeQuery
1%6 ms8 ms
5%15 ms11 ms
20%38 ms27 ms
50%75 ms47 ms
100%128 ms43 ms


Index Size - 10M docs
Range %:
NumericRangeQueryFieldCacheRangeQuery
1%10 ms16 ms
5%28 ms23 ms
20%84 ms53 ms
50%153 ms97 ms
100%249 ms92 ms


Conclusion & Observations:
  • Don't use RangeQuery! (Good thing it is deprecated in Lucene 2.9.x)
  • As index size increases, when the range is small, NumericRangeQuery slows down rather gracefully (less than linear), FieldCacheQuery slows down linearly.
  • As index size increases, when the range is large, NumericRangeQuery slows down almost linearly, and FieldCacheQuery plateaus at 50%.
  • If you expect your range covers >=5% of the corpus, FieldCacheQuery is faster.
  • FieldCacheQuery code snippet above can be easily changed to support Lucene 2.4.x for those of you that have not upgraded.
  • The FieldCacheQuery idea can be applied similarly to non-numeric fields, e.g. range of texts.
  • FieldCacheQuery assumes a one-time cost in index load (for the FieldCache), but the cost is necessary if you want to do sorting.
  • If you want to do range query on a really large index, consider sharding your index.
Don't believe my numbers? Run it yourself by checking out the source code for the test at: http://code.google.com/p/lucene-book/source/checkout, and run the test: book.lid.example.range.RangeTest.

Friday, August 21, 2009

Index Optimization for realtime search - Good idea?

Overview of Optimize:
There is a Lucene API: IndexWriter.optimize(), which combines all segments into 1 large segment and also expunges all deleted docids.

Searching over an optimized index is very fast because you neither pay penalty to evaluate deleted docs nor search time OR'ing over documents in different indexes. After some OS level warming, the 1 segment file is loaded into the IO cache to avoid IO costs. Hence the method name: optimize(). This is terrific for an offline indexing system, where a pristine optimized index is published for searching.

Segment merge:
Segment merge is essential incremental indexing. Lucene has a "hidden" extensible API: MergePolicy. (properly exposed in Lucene 2.9) By default, LogByteSizeMergePolicy is used. This policy will periodically choose to merge small segments into larger segments, and size of the segment is based on number of bytes of the segment file. Only during a merge, deleted docs are expunged.

Real-time indexing:
In a real-time indexing environment, indexing operations are being applied to the index constantly, and the index is fragmented quickly. A challenge here is how to maintain an optimal index for real-time indexing.

In our application, where there are many updates, e.g. old documents are deleted and then added with newer/fresher data. What happened was over time, the largest segment would contain more and more deleted docs, and they will never be expunged because the segment is never a candidate of a merge since deleted docs are merely marked, not removed from the segment, thus the segment size still remain to be large. In the worse scenario, the largest segment would contain only deleted docs.

We made an enhancement to LogMergePolicy to normalize on size taking into consideration number of deleted documents (and contributed back: LUCENE-1634)

This helped quite a bit. We still however, see the problem with the situation when segments are promoted so that they get the merge with the largest segment:

In a realtime scenario, when smaller segments are "escalated" to be merged with the larger segment, the search response time also escalates. This is because the merge itself gets more expensive as the sizes of the segments to be merged get larger. Furthermore, the newly merged segment needs to be loaded into IO cache, while that is happening, search time is impacted significantly.

To solve this problem, we have created a new MergePolicy implementation:

Idea:

Instead of defining an optimized index to be 1 large segment, we redefine it to be N segments of balanced size, where N is a configurable parameter. The idea is to spread the cost of a large segment merge into smaller merge costs.

Implementation:

At each point of the merge operation, the segments to merge is selected to main a balanced segment structure. The selection is modeled as a state and a merge is viewed as a transition between states, and each such transition is associated with a cost function of merge. We then applied the Viterbi algorithm to identify the optimal selection(s).

Performance numbers and details be found at this wiki.

Our MergePolicy implementation has also been contributed back to Lucene: LUCENE-1924

Conclusion:

In conclusion, I would like to emphasize how indexing can affect search performance especially in real-time search. There are often hidden problems as they are invisible to unit tests and simple performance tests. They can also be data dependent and show up after hours or even days of stressing the system. Thus, it is important to understand the details of the indexing to build a scalable and robust system.

Credit:

I'd like to credit this idea and implementation to my colleague Yasuhiro Matsuda.

Saturday, July 18, 2009

Making BooleanQueries faster

At work today, we were looking optimize boolean queries of the type:

f1:v1^b1 OR f1:v2^b2 OR f1:v3^b3 ...
and
f1:v1^b1 AND f1:v2^b2 AND f1:v3^b3 ...

where f1 is a field of some structured data (e.g. Analyze=No, tf=No, norm=No)

We see that this type of query has these patterns:
  1. all values are in the same field
  2. all clauses have the same operator joining them
When the number of clause is large, BooleanQuery can be rather slow.

As of Lucene 2.4, Lucene query api has adopted DocIdSet, and DocIdSetIterator abstractions, which opened doors for various query optimizations. (IMHO, this is one of the best improvements in Lucene from an API stand point.)

For our project, we have quite a few OR-clauses, and the search performance was pretty bad.

Instead, when the index loads, we load into memory a datastructure very much like the FieldCache.StringIndex. In a 5 million index shard, for a field containing N values, memory footprint is approxmiately 20MB + N Strings.

Given K clauses in our OR query, we build a BitSet with K bits turned on, each bit corresponding to an index into String array. (max bit is N)

We then built a DocIdSetIterator that iterates the order array and checks to see if the order[i]-bit is turned on in the bitset.

The resulting performance is quite encouraging: As the number of clauses increases, the iteration time is capped. From our benchmark, when we get to 10+ clauses, the performance gain is approximately 8x!

Here is a detailed write-up:
http://code.google.com/p/bobo-browse/wiki/QueryConstruction

This is an example of how good API design can open doors to things such as what we were able to do.

Kudos to the Lucene team for the DocIdSet api!

Tuesday, April 21, 2009

Index Partitioning and Distributed Realtime Search

When the number of documents gets to be very large in one single index, it is usually a good idea to partition the index and distribute the search load to different processes or search nodes, each serving from a section of the corpus.


Some Background:

There are many different ways of partitioning the index depending on the application requirements, here are some examples.
  • By terms - This is good because not every search node is handling every request. Only the search nodes containing terms from the query are called to serve the request. However, normally some terms are searched more than others, and on top of that, some terms contain more documents than others. Therefore it is usually very difficult to load balance this partitioning scheme.
  • By documents - This is the most common partitioning method. Even though every search node is handling every search request, (assuming documents are uniformly distributed across the partitions,) it is very easy to balance and re-balance search traffic.
  • By time - This is a good idea in corpus containing time sensitive data, e.g. job postings, news, blogs etc. In such cases, recent documents are by definition more relevant than older documents. Thus, we can add more replication on partitions with recent documents while slowly discard older partitions from the index.

Our Partitioning Scheme:



We, at LinkedIn, are in a real-time search situation while guaranteeing high availability, we want to have a partitioning scheme that would allows us to avoid re-indexing/re-partitioning as our corpus grows.

We have decided to choose partition by ranges of documents. We choose the maximum number of documents in a partition, say N. And partition along consecutive document IDs (UIDs, not Lucene docids), .e.g. partition K, would containing documents ranging from K*N to (K+1)*N-1, K = 0,1,2,...

As new documents are added to the index, we grow the partition by adding to the largest partition, and when the UID exceeds the maximum document ID on the largest partition, a new partition is created. This partitioning scheme allows us to grow to newer partitions without a "re-partition".


Cross domain "joins":

Another important benefit we enjoy with this partitioning scheme is that we can load Lucene docid to UID mapping (see my previous post) as well as the reverse: UID mapping to Lucene docid mapping, because we know the range UIDs fall in. We are thus very easily able to translate a filter set in the UID space to a Lucene Filter or a Lucene Query. This is important because it essentially allows us to do "joins" across different document spaces. The memory footprint for holding both lookup arrays are docCount*4 bytes each, nowadays, it's a piece of cake.


Realtime at the Node-Level:

To guarantee realtime on the search node level, each of are partitions are powered by Zoie. See my earlier post)

Sunday, April 12, 2009

Lucene docid,UID mapping and Payload

Lucene docids are internal integers representing documents in the index, and Lucene takes liberty in reassign docids to during segment merges and when expunging deleted documents. In general one needs to be very careful with dealing with docids when they are referenced in the application context because the document a docid refers to is expected to change at anytime.

In many applications, it is very useful to have a quick way to map a Lucene docid to an external UID, e.g. a primary key in a database. For example, result filtering depending on an external document set. In the case of LinkedIn people search, filtering by a person's network on a text search result set, UID in this case being the member id. We needed a mechanism to map from a Lucene docid in a HitCollector.collect() very quickly to the corresponding UID.

A common and naive practice is to keep the UID in a Stored field, e.g.

Field("uid",myUID,Store.YES,Index.No);

and retrieve the uid via:

int uid = Integer.parseInt(indexReader.document(docid)
.get("uid"));

When recall (the number of hit candidates) is large, this method is streaming from the stored file for each hit, and thus does not perform.

A better alternative is to use the FieldCache to load into an integer array a mapping from docid to uid for each docid in the index ,e.g. 0 to indexReader.maxDoc()
(assuming uid is kept in the indexed field) e.g.

Field("uid",myUID,Store.NO,
Index.NOT_ANALYZED_NO_NORMS);

When IndexReader loads, we load the "map array":

int[] uidArray = FieldCache.DEFAULT.getInts(indexReader,"uid");

This is done once for a new IndexReader, and at search time, the mapping is just an array lookup:

int uid = uidArray[docid];

which is rather fast.

The problem here is that the number of terms in the uid field is equal to or very close to the number of documents in the index. To load from the FieldCache, a random seek is done for each term, which makes loading the uidArray extremely slow. This is fine if IndexReader does not load very often, but unfortunately for us, in a realtime search system, we don't have the luxury of such assumption. (This also, as a side effect, increases the total number of terms in the index and may impact search performance)

Lucky for us, Lucene 2.2 introduced Payloads, (a huge step towards flexible indexing), which is the ability to allow arbitrary data to be added to the posting list. In our case, we would create an arbitrary term for every document, e.g. Term("uid","_UID_"), and attach a 4-byte uid value to each posting:

class UIDTokenStream extends TokenStream{
private Token token = new Token("_UID_",0,0);
private byte[] buffer = new byte[4];
private boolean returnToken = false;

void setUID(int uid){
buffer[0] = (byte)uid;
buffer[1] = (byte)(uid>>8);
buffer[2] = (byte)(uid>>16);
buffer[3] = (byte)(uid>>24);
token.setPayload(new Payload(buffer));
returnToken = true;
}

public Token next() throws IOException{
if (returnToken){ returnToken = false; return token; }
else { return null; }
}
}

UIDTokenStream tokenStream = new UIDTokenStream();
tokenStream.setUID(myUID);
Field("uid",tokenStream);

When we load the uidArray, we do:

TermPositions tp = null;
byte[] dataBuffer = new byte[4];
int[] uidArray = new int[indexReader.maxDoc()];
try{
int idx = 0;
tp = indexReader.termPositions(new Term("uid","_UID_"));
while(tp.next()){
int doc = tp.doc();
tp.nextPosition();
tp.getPayload(dataBuffer,0);
// convert buffer to int
int uid = ((dataBuffer[3]& 0xFF) <<24)>

uidArray[idx++]=uid;
}
}
finally{
if (tp!=null){
tp.close();
}
}

Looking up the uid is same as before, simply an array lookup:

int uid = uidArray[docid];

The difference here is when loading the uidArray, sequential seek is done for each docid while paying the penalty of byte[] -> int conversion. (Also to a previous point, this method introduces only 1 extra term to the entire index)

We ran some performance numbers on a 2M-document index, loading from the FieldCache took more than 16.5 seconds, while loading the uidArray from payload took 430 ms, this is an improvement of over 38x! (this time was taken a while ago from my MacBook Pro using Lucene 2.2)

This mechanism is built into Zoie Realtime Search and Indexing System used to filter out duplicate documents between in-memory indexes and the disk index. (Since they are independent indexes, only consistent id to filter from is the UID)

Tuesday, April 7, 2009

Zoie - a realtime search and indexing system

A realtime search system makes it possible for queries to find a new document immediately or almost immediately after it has been updated. With Lucene's incremental update functionality, we were able to extend Lucene to support realtime indexing/search.

We open-sourced this technology and called it Zoie: http://code.google.com/p/zoie.

Our design uses multiple indexes; one main index on disk, two additional indexes in memory to handle transient indexing events.

The disk index will grow and be rather large, therefore indexing updates on disk will be performed in batches. Processing updates in batches, allows us to merge updates on the same document to reduce redundant updates. Moreover, the disk index would not be fragmented as the indexer is not thrashed by a large number of small indexing calls/requests. We keep a shared disk-based IndexReader to serve search request. Once batch indexing is performed, we build/load a new IndexReader and then publish the new shared IndexReader. The cost of building/loading the IndexReader is thus hidden from the cost of search.

To ensure realtime behavior, we have two helper memory indexes (MemA and MemB) that alternate in their roles. One index, say MemA, accumulates events and directly serves realtime results. When a flush/commit event occurs, MemA stops receiving new events, these are sent to MemB. At this time search requests are served from MemA, MemB and the disk index. Once the disk merge is completed, MemA is cleared and MemB takes its place. Untill the next flush/commit searches will be served from the new disk index and MemB.

For each search request, we open and load a new IndexReader from each of the memory indexes, and along with the shared disk IndexReader, we return a MultiIndexReader built from those three IndexReaders.

Zoie has been running in production at http://www.linkedin.com, in distributed mode, it is handling almost 40 million documents (or user profiles) in realtime and serving over 6 million requests a day with an average latency below 50ms.

We welcome contributions in any form to move the project forward.