publicvoidaddRequest(SearchComponent me, ShardRequest sreq){ outgoing.add(sreq); if ((sreq.purpose & ShardRequest.PURPOSE_PRIVATE)==0) { // if this isn't a private request, let other components modify it. for (SearchComponent component : components) { if (component != me) { component.modifyRequest( this, me, sreq); } } } }
// SearchHandler.handleRequestBody() if (rb.shards == null) { // a normal non-distributed request // ... } else { // a distributed request
HttpCommComponent comm = new HttpCommComponent();
if (rb.outgoing == null ) { rb.outgoing = new LinkedList<ShardRequest>(); } rb.finished = new ArrayList<ShardRequest>();
int nextStage = 0; do { rb.stage = nextStage; nextStage = ResponseBuilder.STAGE_DONE;
// call all components for( SearchComponent c : components ) { // the next stage is the minimum of what all components report nextStage = Math.min(nextStage, c.distributedProcess(rb)); }
// check the outgoing queue and send requests while (rb.outgoing.size() > 0) {
// submit all current request tasks at once while (rb.outgoing.size() > 0) { ShardRequest sreq = rb.outgoing.remove(0); sreq.actualShards = sreq.shards ; if (sreq.actualShards ==ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards ; } sreq.responses = new ArrayList<ShardResponse>();
// TODO: map from shard to address[] for (String shard : sreq.actualShards ) { ModifiableSolrParams params = new ModifiableSolrParams(sreq.params ); params.remove(ShardParams.SHARDS); // not a top-level request params.remove( "indent"); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true ); // a sub (shard) request String shardHandler = req.getParams().get(ShardParams.SHARDS_QT ); if (shardHandler == null) { params.remove(CommonParams.QT); } else { params.set(CommonParams.QT, shardHandler); } // You can see CommonsHttpSolrServer.request. comm.submit(sreq, shard, params); } }
// now wait for replies, but if anyone puts more requests on // the outgoing queue, send them out immediately (by exiting // this loop) while (rb.outgoing.size() == 0) { ShardResponse srsp = comm.takeCompletedOrError(); if (srsp == null) break; // no more requests to wait for
// Was there an exception? If so, abort everything and // rethrow if (srsp.getException() != null) { comm.cancelAll(); if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { thrownew SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } }
rb.finished.add(srsp.getShardRequest());
// let the components see the responses to the request for(SearchComponent c : components ) { c.handleResponses(rb, srsp.getShardRequest()); } } }
for(SearchComponent c : components) { c.finishStage(rb); }
// we are done when the next stage is MAX_VALUE // BTW ResponseBuilder.STAGE_DONE == Integer.MAX_VALUE } while (nextStage != Integer.MAX_VALUE); }
3、其他
有意思的是,我从Solr 3.5的reference中看到一句这样的话:
It is up to you to get all your documents indexed on each shard of your server farm. Solr does not include out-of-the-box support for distributed indexing, but your method can be as simple as a round robin technique. Just index each document to the next server in the circle.
SolrCloud is the name of a set of new distributed capabilities in Solr. Passing parameters to enable these capabilities will enable you to set up a highly available, fault tolerant cluster of Solr servers. Use SolrCloud when you want high scale, fault tolerant, distributed indexing and search capabilities.