2013-11-26 3 views
1

У меня есть требование импортировать большой объем данных из базы данных mysql и индексировать документы (около 1000 документов). Во время процесса индексирования мне нужно выполнить специальную обработку поля, отправив запросы расширения на внешний сервер Apache Stanbol. Я сконфигурировал свой обработчик dataimport в файле solrconfig.xml для использования StanbolContentProcessor в цепочке обновлений, как показано ниже;Как сжимать dataimports в Solr с помощью batchSize

<updateRequestProcessorChain name="stanbolInterceptor"> 
    <processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/> 
    <processor class="solr.RunUpdateProcessorFactory" /> 
</updateRequestProcessorChain> 

<requestHandler name="/dataimport" class="solr.DataImportHandler"> 
    <lst name="defaults"> 
     <str name="config">data-config.xml</str> 
     <str name="update.chain">stanbolInterceptor</str> 
    </lst> 
</requestHandler> 

Образец данных-config.xml приведен ниже:

<dataConfig> 
    <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" 
       url="jdbc:mysql://localhost:3306/solrTest" 
       user="test" password="test123" batchSize="1" /> 
    <document name="stanboldata"> 
     <entity name="stanbolrequest" query="SELECT * FROM documents"> 
      <field column="id" name="id" /> 
      <field column="content" name="content" /> 
      <field column="title" name="title" /> 
     </entity> 
    </document> 
</dataConfig> 

При запуске большого импорта около 1000 документов, мой stanbol сервер идет вниз, я подозреваю, что из-за большой нагрузки от выше Solr Stanbolnterceptor. Я хотел бы дросселировать dataimport партиями, так что Stanbol может обрабатывать управляемое количество запросов одновременно.

Возможно ли это, используя параметр batchSize в элементе dataSource в data-config?

Может кто-нибудь, пожалуйста, дайте несколько идей для дросселирования нагрузки dataimport в Solr?

Это мой пользовательский UpdateProcessor класс обработки запросов Stanbol во время/dataimport

public class StanbolContentProcessorFactory extends 
     UpdateRequestProcessorFactory { 

    public static final String NLP_ORGANIZATION = "nlp_organization"; 
    public static final String NLP_PERSON = "nlp_person"; 
    public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" }; 
    public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer"; 

    @Override 
    public UpdateRequestProcessor getInstance(SolrQueryRequest req, 
      SolrQueryResponse res, UpdateRequestProcessor next) { 

     return new StanbolContentProcessor(next); 
    } 

    class StanbolContentProcessor extends UpdateRequestProcessor { 

     public StanbolContentProcessor(UpdateRequestProcessor next) { 
      super(next); 
     } 

     @Override 
     public void processAdd(AddUpdateCommand cmd) throws IOException { 
      SolrInputDocument doc = cmd.getSolrInputDocument(); 
      String request = ""; 
      for (String field : STANBOL_REQUEST_FIELDS) { 
       if (null != doc.getFieldValue(field)) { 
        request += (String) doc.getFieldValue(field) + ". "; 
       } 

      } 
      try { 
       EnhancementResult result = stanbolPost(request, getBaseURI()); 
       Collection<TextAnnotation> textAnnotations = result 
         .getTextAnnotations(); 
       // extracting text annotations 
       Set<String> personSet = new HashSet<String>(); 
       Set<String> orgSet = new HashSet<String>(); 
       for (TextAnnotation text : textAnnotations) { 
        String type = text.getType(); 
        String selectedText = text.getSelectedText(); 

        if (null != type && null != selectedText) { 
         if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON) 
           || type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) { 
          personSet.add(selectedText); 

         } else if (type 
           .equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION) 
           || type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) { 
          orgSet.add(selectedText); 

         } 
        } 
       } 
       for (String person : personSet) { 
        doc.addField(NLP_PERSON, person); 
       } 
       for (String org : orgSet) { 
        doc.addField(NLP_ORGANIZATION, org); 
       } 
       cmd.solrDoc = doc; 
       super.processAdd(cmd); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
     } 

    } 

    private EnhancementResult stanbolPost(String request, URI uri) { 
     Client client = Client.create(); 
     WebResource webResource = client.resource(uri); 
     ClientResponse response = webResource.type(MediaType.TEXT_PLAIN) 
       .accept(new MediaType("application", "rdf+xml")) 
       .entity(request, MediaType.TEXT_PLAIN) 
       .post(ClientResponse.class); 

     int status = response.getStatus(); 
     if (status != 200 && status != 201 && status != 202) { 
      throw new RuntimeException("Failed : HTTP error code : " 
        + response.getStatus()); 
     } 
     String output = response.getEntity(String.class); 
     // Parse the RDF model 

     Model model = ModelFactory.createDefaultModel(); 
     StringReader reader = new StringReader(output); 
     model.read(reader, null); 
     return new EnhancementResult(model); 

    } 


    private static URI getBaseURI() { 
     return UriBuilder.fromUri(STANBOL_ENDPOINT).build(); 
    } 

} 

ответ

3

Опция batchSize используется для извлечения строк из таблицы базы данных в пакетном режиме, с тем чтобы уменьшить использование памяти (она часто используется для предотвращение нехватки памяти при запуске обработчика импорта данных). В то время как более низкий размер партии может быть медленнее, этот параметр не влияет на скорость процесса импорта.

Мое предложение было бы ограничить запросы другим способом, например, с использованием правила брандмауэра. Если вы используете Linux и иметь доступ к Netfilter, вы можете запустить что-то вроде следующей команды:

iptables -A INPUT -p tcp --dport 12345 -m limit --limit 10/s -j ACCEPT 

Где 12345 'является портом Stanbol и '10/s'это количество пакетов в секунду, чтобы принять ,

3

Mowgli is right, batchsize не поможет вам с этим. Так как большинство людей столкнулись с проблемой, наоборот (например, My dataimport is too slow, please help), в Solr ничего подобного нет. По крайней мере, я ничего не знаю.


Лично я бы не стал настраивать вашу систему Linux для обработки дросселирования для вас. Если вы переходите со сцены на сцену или переходите на другой сервер, когда вам нужно помнить об этом. И если люди меняются в течение всей жизни вашей системы, они не будут знать этого.

Итак, я не знаю код вашего StanbolContentProcessorFactory, но, как уже упоминалось, in your other question это нестандартный код. Поскольку это ваш собственный код, вы можете добавить механизм дроссельной заслонки. Чтобы подробнее рассказать об этом, мне нужен какой-то код для просмотра.


Update

Solr имеет гуавы Google, поэтому я хотел бы использовать RateLimiteras proposed here.Если вы строите Maven, это означает, что вы можете использовать область provided. Если вы не используете Maven, вам не нужно делать fatjar или размещать guava с папкой lib Solr.

import com.google.common.util.concurrent.RateLimiter; 

public class StanbolContentProcessorFactory extends 
    UpdateRequestProcessorFactory { 

    // ... 

    // add a rate limiter to throttle your requests 
    // this setting would allow 10 requests per second 
    private RateLimiter throttle = RateLimiter.create(0.1); 

    // ... 

    private EnhancementResult stanbolPost(String request, URI uri) { 
     Client client = Client.create(); 

     // this will throttle your requests 
     throttle.acquire(); 

     WebResource webResource = client.resource(uri); 
     ClientResponse response = webResource.type(MediaType.TEXT_PLAIN) 
      .accept(new MediaType("application", "rdf+xml")) 
      .entity(request, MediaType.TEXT_PLAIN) 
      .post(ClientResponse.class); 

     int status = response.getStatus(); 
     if (status != 200 && status != 201 && status != 202) { 
      throw new RuntimeException("Failed : HTTP error code : " 
       + response.getStatus()); 
     } 
     String output = response.getEntity(String.class); 
     // Parse the RDF model 
     Model model = ModelFactory.createDefaultModel(); 
     StringReader reader = new StringReader(output); 
     model.read(reader, null); 
     return new EnhancementResult(model); 
} 
+0

Я добавил класс здесь для reference..stanbolPost() метод отвечает за подключение к серверу Stanbol и возвращать результаты аксессуара, в то время как документ обновляется с новыми полями (nlp_person, nlp_organization) с помощью enhancementResult в методе processAdd(). –