Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel incremental indexing [LUCENE-1879] #2954

Open
asfimport opened this issue Aug 31, 2009 · 21 comments
Open

Parallel incremental indexing [LUCENE-1879] #2954

asfimport opened this issue Aug 31, 2009 · 21 comments

Comments

@asfimport
Copy link

asfimport commented Aug 31, 2009

A new feature that allows building parallel indexes and keeping them in sync on a docID level, independent of the choice of the MergePolicy/MergeScheduler.

Find details on the wiki page for this feature:

http://wiki.apache.org/lucene-java/ParallelIncrementalIndexing

Discussion on java-dev:

http://markmail.org/thread/ql3oxzkob7aqf3jd


Migrated from LUCENE-1879 by Michael Busch, 5 votes, updated May 09 2016
Attachments: parallel_incremental_indexing.tar
Linked issues:

@asfimport
Copy link
Author

Michael Busch (migrated from JIRA)

I have a prototype version which I implemented in IBM; it contains a version that works on Lucene 2.4.1. I'm not planning on committing as is, because it is implemented on top of Lucene's APIs without any core change and therefore not as efficiently as it could be. The software grant I have lists these files. Shall I attach the tar + md5 here and send the signed software grant to you, Grant?

@asfimport
Copy link
Author

Grant Ingersoll (@gsingers) (migrated from JIRA)

Yes on the soft. grant.

@asfimport
Copy link
Author

Michael Busch (migrated from JIRA)

MD5 (parallel_incremental_indexing.tar) = b9a92850ad83c4de2dd2f64db2dcceab
md5 computed on Mac OS 10.5.7

This tar file contains all files listed in the software grant. It is a prototype that works with Lucene 2.4.x only, not with current trunk.
It also has some limitations mentioned before, which are not limitations of the design, but rather because it runs on top of Lucene's APIs (I wanted the code to run with an unmodified Lucene jar).

Next I'll work on a patch that runs with current trunk.

@asfimport
Copy link
Author

Michael McCandless (@mikemccand) (migrated from JIRA)

I wonder if we could change Lucene's index format to make this feature
simpler to implement...

Ie, you're having to go to great lengths (since this is built
"outside" of Lucene's core) to force multiple separate indexes to
share everything but the postings files (merge choices, flush,
deletions files, segments files, turning off the stores, etc.).

What if we could invert this approach, so that we use only single
index/IndexWriter, but we allow "partitioned postings", where sets of
fields are mapped to different postings files in the segment?

Whenever a doc is indexed, postings from the fields are then written
according to this partition. Eg if I map "body" to partition 1, and
"title" to partition 2, then I'd have two sets of postings files for
each segment.

Could something like this work?

@asfimport
Copy link
Author

asfimport commented Nov 4, 2009

Michael Busch (migrated from JIRA)

I realize the current implementation that's attached here is quite
complicated, because it works on top of Lucene's APIs.

However, I really like its flexibility. You can right now easily
rewrite certain parallel indexes without touching others. I use it in
quite different ways. E.g you can easily load one parallel index into a
RAMDirectory or SSD and leave the other ones on the conventional disk.

#3100 only optimizes a certain use case of the parallel indexing,
where you want to (re)write a parallel index containing only posting
lists and this will especially improve scenarios like Yonik pointed
out a while ago on java-dev where you want to update only a few
documents, not e.g. a certain field for all documents.

In other use cases it is certainly desirable to have a parallel index
that contains a store. It really depends on what data you want to
update individually.

The version of parallel indexing that goes into Lucene's core I
envision quite differently from the current patch here. That's why I'd
like to refactor the IndexWriter (#3101) into SegmentWriter and
let's call it IndexManager (the component that controls flushing,
merging, etc.). You can then have a ParallelSegmentWriter, which
partitions the data into parallel segments, and the IndexManager can
behave the same way as before.

You can keep thinking about the whole index as a collection of segments,
just now it will be a matrix of segments instead of a one-dimensional
list.

E.g. the norms could in the future be a parallel segment with a single
column-stride field that you can update by writing a new generation of
the parallel segment.

Things like two-dimensional merge policies will nicely fit into this
model.

Different SegmentWriter implementations will allow you to write single
segments in different ways, e.g. doc-at-a-time (the default one with
addDocument()) or term-at-a-time (like addIndexes*() works).

So I agree we can achieve updating posting lists the way you describe,
but it will be limited to posting lists then. If we allow (re)writing
segments in both dimensions I think we will create a more flexible
approach which is independent on what data structures we add to Lucene

  • as long as they are not global to the index but per-segment as most
    of Lucene's structures are today.

What do you think? Of course I don't want to over-complicate all this,
but if we can get #3101 right, I think we can implement parallel
indexing in this segment-oriented way nicely.

@asfimport
Copy link
Author

asfimport commented Nov 6, 2009

Michael McCandless (@mikemccand) (migrated from JIRA)

This sounds great! In fact your proposal for a ParallelSegmentWriter
is just like what I'm picturing – making the switching "down low"
instead of "up high" (above Lucene). This'd be more generic than just
the postings files, since all index files can be separately written.

It'd then a low-level question of whether ParallelSegmentWriter stores
its files in different Directories, or, a single directory with
different file names (or maybe sub-directories within a directory, or,
something else). It could even use FileSwitchDirectory, eg to direct
certain segment files to an SSD (another way to achieve your example).

This should also fit well into #2532 (flexible indexing) – one
of the added test cases there creates a per-field codec wrapper that
lets you use a different codec per field. Right now, this means
separate file names in the same Directory for that segment, but we
could allow the codecs to use different Directories (or, FSD as well)
if they wanted to.

Different SegmentWriter implementations will allow you to write single
segments in different ways, e.g. doc-at-a-time (the default one with
addDocument()) or term-at-a-time (like addIndexes*() works).

Can you elaborate on this? How is addIndexes* term-at-a-time?

If we allow (re)writing segments in both dimensions I think we will
create a more flexible approach which is independent on what data
structures we add to Lucene

Dimension 1 is the docs, and dimension 2 is the assignment of fields
into separate partitions?

@asfimport
Copy link
Author

asfimport commented Nov 6, 2009

Michael Busch (migrated from JIRA)

This sounds great! In fact your proposal for a ParallelSegmentWriter
is just like what I'm picturing - making the switching "down low"
instead of "up high" (above Lucene). This'd be more generic than just
the postings files, since all index files can be separately written.

Right. The goal should it be to be able to use this for updating Lucene internal things (like norms, column-stride fields), but also giving advanced users APIs, so that they can partition their data into parallel indexes according to their update requirements (which the current "above Lucene" approach allows).

t'd then a low-level question of whether ParallelSegmentWriter stores
its files in different Directories, or, a single directory with
different file names (or maybe sub-directories within a directory, or,
something else). It could even use FileSwitchDirectory, eg to direct
certain segment files to an SSD (another way to achieve your example).

Exactly! We should also keep the distributed indexing use case in mind here. It could make sense for systems like Katta to not only shard in the document direction.

This should also fit well into #2532

Sounds great!

@asfimport
Copy link
Author

Michael Busch (migrated from JIRA)

Can you elaborate on this? How is addIndexes* term-at-a-time?

Let's say we have an index 1 with two fields a and b and you want to create a new parallel index 2 in which you copy all posting lists of field b. You can achieve this by using addDocument(), if you iterate on all posting lists in 1b in parallel and create for each document in 1 a corresponding document in 2 that contains the terms of the postings lists from 1b that have a posting for the current document. This I called the "document-at-a-time approach".

However, this is terribly slow (I tried it out), because of all the posting lists you perform I/O on in parallel. It's far more efficient to copy an entire posting list over from 1b to 2, because then you only perform sequential I/O. And if you use 2.addIndexes(IndexReader(1b)), then exactly this happens, because addIndexes(IndexReader) uses the SegmentMerger to add the index. The SegmentMerger iterates the dictionary and consumes the posting lists sequentially. That's why I called this "term-at-a-time approach". In my experience this is for a similar use case as the one I described here orders of magnitudes more efficient. My doc-at-a-time algorithm ran ~20 hours, the term-at-a-time one 8 minutes! The resulting indexes were identical.

@asfimport
Copy link
Author

Michael Busch (migrated from JIRA)

Dimension 1 is the docs, and dimension 2 is the assignment of fields
into separate partitions?

Yes, dimension 1 is unambiguously the docs. Dimension 2 can be the fields into separate parallel indexes, or also what we call today generations for e.g. the norms files.

@asfimport
Copy link
Author

Shai Erera (@shaie) (migrated from JIRA)

(Warning, this post is long, and is easier to read in JIRA)

I've investigated the attached code a lot and I'd like to propose a different design and approach to this whole Parallel Index solution. I'll start by describing the limitations of the current design (whether its the approach or the code is debatable):

  • Lucene is not built/designed properly to a Master/Slave architecture, where different indexes share important files with others (such as segments_N, segments.gen and .del).
    • I've realized this when I found that if tests (in this patch) are run with "-ea", there are many assert exceptions that are printed from IndexWriter.startCommit. The reason is the Master just updated one of the segments .del generation (and deleted the previous one), but the Slave is not aware of that yet and looks for the wrong .del file. While this does not run on production (e.g. "-ea" is usually not activated), it does affect the tests because the assertion stops operations abruptly.
    • Though someone can claim we can fix that, I think it points at a problem in the design, and makes the whole solution fragile.
  • I think it'd be really neat to introduce a ParallelWriter, equivalent to ParallelReader. The latter does not have a Master/Slave notion and so I don't think PW should have.
  • When I inspected the code carefully, I realized there are lots of hoola hoops done in order to make the Master and Slave in sync. Such hoola hoops may be resolved if Lucene's IW API would be more extensible, but still:
    • The MergePolicy used is one that for the Slaves never checks the segments for which merges should actually be done. Rather, it relies on the Master policy to set the proper merges. Which is a must in this design because only the master needs to decide when to merge.
    • However, and here I think it's because of lack of API on IW, the way the merge is done is that the master first calls mergeInit(merge), then on all slaves .maybeMerge() and then it merges that merge. maybeMerge() on the slaves consume all the merges that were decided to be run by the master, while when that finished, the master didn't finish even one merge ...
    • That works though because the MergeScheduler used is a Serial one (not SMS but still Serial) and blocking. However that leads to inconsistencies - slaves' segments view is different at one point in time from the master's.
  • The current approach does not support multi-threaded indexing, but I think that's a limitation that could be solved by exposing some API on IW or DW.
  • Only SMS is supported on the slaves.
  • Optimize, expungeDeletes are unsupported. Though the could and perhaps just not implemented.
  • The current approach prevents having an architecture on which some of the parallels reside on different machines, because they share the .del and segments file with the master. It's not the worse limitation in the world, but still a limitation (of having any chance to do it efficiently) I'd like to avoid.
  • And I'm sure there are more disadvantages that I don't remember now.

I'd like to point out that even if the above limitations can be worked around, I still think the Master and Slave notion is not the best approach. At least, I'd like to propose a different approach:

  • Introduce a ParallelWriter which serves as a manager/orchestrator on top of other IWs. It is not a pure decorator because it drives everything that happens on the IWs, but it does not contain any actual indexing logic (e.g. add/delete/update documents).
    • The IWs PW will manage will be named hereinafter Slices.
  • IW will expose enough API to perform two-phase operations, like the two-phase commit one can achieve today. Example operations (and I don't cover all for the interest of space):
    • addDocument - first obtain a doc ID, then proceed w/ addDocument on all Slices
    • optimize - already exists
    • merge - do the merge on all Slices and stamp it after all finished.
    • deleteDocuments - here we would need to expose some API on IW for DW to get an IndexReader so that IW can still return its readerPool.getReader but PW will return a ParallelSegmentReader or something, to perform the deletes across all Slices.
    • The idea is that we should do enough on the Slices so that if one fails we can still rollback, and the final 'stamp' process will be very fast and less likely to fail.
  • For correctness and protectiveness, PW will only accept a Directory and not IW. Reason is:
    • We want all sorts of settings like MP, MS, RAM buffer usage to be controlled by PW and not on the Slices. If we allow to pass an IW instance, one could override whatever we set, which is wrong.
    • Even though one could claim that someone 'can shoot himself in the leg freely', I think that we should be resilient enough to protect stupid users from themselves.
    • We will need to allow to pass in an IW Configuration object, so that we can still account for settings such as Analyzer, MaxFieldLength etc., but discard other settings which PW will control directly
      • Such Configuration was proposed in the past already and will eliminate lots of methods on IW and ctors.
    • On a side note, ParallelReader accepts IR today, which can lead the problems such as one passes two IRs, one read-only and one not, and then deletes documents by the writable IR, with PR not knowing about it. But it's a different issue, and I'll open a separate one for that.
  • A special MergeScheduler and MergePolicy will be introduced to allow PW to drive merges across the Slices. The idea is to support whatever MS/MP the application wants (SMS, CMS, custom), and ensuring that when MP decides a merge should be performed, that merge is executed by MS across all Slices. Few things:
    • I think that a special MP is not needed, only MS. But need to validate that. If that's true, then apps could use their own custom MPs freely.
    • I think custom MS may be supported ... all that's required is for the MS to run on PW and whenever it calls its merge(), let PW run the merges across all Slices? But I still need to validate that code.
    • CMS can introduce two-level concurrency. One like today which executes different merges decided by MP concurrently. The other would control the concurrency level those merges are executed on the Slices.
      • Hmm ... even SMS can benefit from that ...

I realize that accepting only Directory on PW might limit applications who want to pass in their own IW extension, for whatever reason. But other than saying "if you pass in IW and configure it afterwards, it's on your head", I don't think there is any other option ... Well maybe except if we expose a package-private API for PW to turn off configuration on an IW after it set it, so successive calls to the underlying IW's setters will throw an exception ... hmm might be doable. I'll look into that. If that will work, we might want to do the same for the ParallelReader as well.

Michael mentioned a scenario above where one would want to rebuild an index Slice. That's still achievable by this design - one should build the IW on the outside and then replace the Directory instance on PW. We'll need to expose such API as well.

BTW, some of the things I've mentioned can be taken care of in different issues, as follow on improvements, such as two-level concurrency, supporting custom MS etc. I've detailed them here just so we all see the bigger picture that's going on in my head.

I think I wrote all (or most) of the high-level details. I'd like to start implementing this soon. In my head it's all chewed and digested, so I feel I can start implementing today. If possible, I'd like to get this out in 3.1. I'll try to break this issue down to as many issues as I can, to make the contributions containable. We should just keep in mind for each such issue the larger picture it solves.

I'd appreciate your comments.

@asfimport
Copy link
Author

Michael McCandless (@mikemccand) (migrated from JIRA)

I like the ParallelWriter (index slices) approach!

It sounds quite feasible and more "direct" in how the PW controls each
sub writer. It should be as simple as setting null merge
policy/scheduler on the subs would mean they do no merging themselves,
but then the PW invokes their .merge methods to explicitly merge at
the right times. Vs the current approach that makes "faker" merge
policy/scheduler (I think?).

Some of this will require IW to open up some APIs – eg making docID
assignment a separate method call. Likely many of these will just be
protected APIs w/in IW.

@asfimport
Copy link
Author

asfimport commented Mar 26, 2010

Michael Busch (migrated from JIRA)

#3400 will be helpful to support multi-threaded parallel-indexing. If we have single-threaded DocumentsWriters, then it should be easy to have a ParallelDocumentsWriter?

@asfimport
Copy link
Author

asfimport commented Mar 26, 2010

Shai Erera (@shaie) (migrated from JIRA)

The way I planned to support multi-threaded indexing is to do a two-phase addDocument. First, allocate a doc ID from DocumentsWriter (synchronized) and then add the Document to each Slice with that doc ID. DocumentsWriter was not suppose to know it is a parallel index ... something like the following.

int docId = obtainDocId();
for (IndexWriter slice : slices) {
  slice.addDocument(docId, Document);
}

That allows ParallelWriter to be really an orchestrator/manager of all slices, while each slice can be an IW on its own.

Now, when you say ParallelDocumentsWriter, I assume you mean that that DocWriter will be aware of the slices? That I think is an interesting idea, which is unrelated to #3400. I.e., ParallelWriter will invoke its addDocument code which will get down to ParallelDocumentWriter, which will allocate the doc ID itself and call each slice's DocWriter.addDocument? And then #3400 will just improve the performance of that process?

This might require a bigger change to IW then I had anticipated, but perhaps it's worth it.

What do you think?

@asfimport
Copy link
Author

Grant Ingersoll (@gsingers) (migrated from JIRA)

First off, I haven't looked at the code here or the comments beyond skimming, but this is something I've had in my head for a long time, but don't have any code. When I think about the whole update problem, I keep coming back to the notion of Photoshop Layers that essentially mask the underlying part of the photo, w/o damaging it. The analogy isn't quite the same here, but nevertheless...

This leads me to wonder if the solution isn't best achieved at the index level and not at the Reader/Writer level.

So, thinking out loud here and I'm not sure on the best wording of this:
when a document first comes in, it is all in one place, just as it is now. Then, when an update comes in on a particular field, we somehow mark in the index that the document in question is modified and then we add the new change onto the end of the index (just like we currently do when adding new docs, but this time it's just a doc w/ a single field). Then, when searching, we would, when scoring the affected documents, go to a secondary process that knew where to look up the incremental changes. As background merging takes place, these "disjoint" documents would be merged back together. We'd maybe even consider a "high update" merge scheduler that could more frequently handle these incremental merges. In a sense, the old field for that document is masked by the new field. I think, given proper index structure, that we maybe could make that marking of the old field fast (maybe it's a pointer to the new field, maybe it's just a bit indicating to go look in the "update" segment)

On the search side, I think performance would still be maintained b/c even in high update envs. you aren't usually talking about more than a few thousand changes in a minute or two and the background merger would be responsible for keeping the total number of disjoint documents low.

@asfimport
Copy link
Author

Shai Erera (@shaie) (migrated from JIRA)

Hi Grant - I believe what you describe is related to solving the incremental field updates problem, where someone might want to change the value of a specific document's field. But PI is not about that. Rather, PI is about updating a whole slice at once, ie, changing a field's value across all docs, or adding a field to all docs (I believe such question was asked on the user list few days ago). I've listed above several scenarios where PI is useful for, but unfortunately it is unrelated to incremental field updates.

If I misunderstood you, then please clarify.

Re incremental field updates, I think your direction is interesting, and deserves discussion, but in a separate issue/thread?

@asfimport
Copy link
Author

Grant Ingersoll (@gsingers) (migrated from JIRA)

Thanks, Shai, I had indeed misread the intent, and was likely further confused due to the fact that Michael B and I discussed it over tasty Belgian Beer in Oakland. I'll open a discussion on list for incremental field updates.

@asfimport
Copy link
Author

Michael Busch (migrated from JIRA)

I'll start by describing the limitations of the current design (whether its the approach or the code is debatable):

FWIW: The attached code and approach was never meant to be committed. I attached it for legal reasons, as it contains the IP that IBM donated to Apache via the software grant. Apache requires to attach the code that is covered by such a grant.

I wouldn't want the master/slave approach in Lucene core. You can implement it much nicer inside of Lucene. The attached code however was developed with the requirement of having to run on top of an unmodified Lucene version.

I've realized this when I found that if tests (in this patch) are run with "-ea", there are many assert exceptions that are printed from IndexWriter.startCommit.

The code runs without exceptions with Lucene 2.4. It doesn't work with 2.9/3.0, but you'll find an upgraded version that works with 3.0 within IBM, Shai.

@asfimport
Copy link
Author

Shai Erera (@shaie) (migrated from JIRA)

I have found such version ... and it fails too :). At least the one I received.

But never mind that ... as long as we both agree the implementation should change. I didn't mean to say anything bad about what you did .. I know the limitations you had to work with.

@asfimport
Copy link
Author

An Hong Yun (migrated from JIRA)

Hi, Michael

Is there any lastest progress on this topic? I am very interested in this!

@asfimport
Copy link
Author

Eks Dev (migrated from JIRA)

The user mentioned above in comment was me, I guess. Commenting here just to add interesting use case that would be perfectly solved by this issue.

Imagine solr Master - Slave setup, full document contains CONTENT and ID fields, e.g. 200Mio+ collection. On master, we need field ID indexed in order to process delete/update commands. On slave, we do not need lookup on ID and would like to keep our TermsDictionary small, without exploding TermsDictionary with 200Mio+ unique ID terms (ouch, this is a lot compared to 5Mio unique terms in CONTENT, with or without pulsing).

With this issue, this could be nativly achieved by modifying solr UpdateHandler not to transfer "ID-Index" to slaves at all.

There are other ways to fix it, but this would be the best.(I am currently investigating an option to transfer full index on update, but to filter-out TermsDictionary on IndexReader level (it remains on disk, but this part never gets accessed on slaves). I do not know yet if this is possible at all in general , e.g. FST based term dictionary is already built (prefix compressed TermDict would be doable)

@asfimport
Copy link
Author

Steven Rowe (@sarowe) (migrated from JIRA)

Bulk move 4.4 issues to 4.5 and 5.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant