У меня есть требование импортировать большой объем данных из базы данных mysql и индексировать документы (около 1000 документов). Во время процесса индексирования мне нужно выполнить специальную обработку поля, отправив запросы расширения на внешний сервер Apache Stanbol. Я сконфигурировал свой обработчик dataimport в файле solrconfig.xml для использования StanbolContentProcessor в цепочке обновлений, как показано ниже;Как сжимать dataimports в Solr с помощью batchSize
<updateRequestProcessorChain name="stanbolInterceptor">
<processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="/dataimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">data-config.xml</str>
<str name="update.chain">stanbolInterceptor</str>
</lst>
</requestHandler>
Образец данных-config.xml приведен ниже:
<dataConfig>
<dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/solrTest"
user="test" password="test123" batchSize="1" />
<document name="stanboldata">
<entity name="stanbolrequest" query="SELECT * FROM documents">
<field column="id" name="id" />
<field column="content" name="content" />
<field column="title" name="title" />
</entity>
</document>
</dataConfig>
При запуске большого импорта около 1000 документов, мой stanbol сервер идет вниз, я подозреваю, что из-за большой нагрузки от выше Solr Stanbolnterceptor. Я хотел бы дросселировать dataimport партиями, так что Stanbol может обрабатывать управляемое количество запросов одновременно.
Возможно ли это, используя параметр batchSize в элементе dataSource в data-config?
Может кто-нибудь, пожалуйста, дайте несколько идей для дросселирования нагрузки dataimport в Solr?
Это мой пользовательский UpdateProcessor класс обработки запросов Stanbol во время/dataimport
public class StanbolContentProcessorFactory extends
UpdateRequestProcessorFactory {
public static final String NLP_ORGANIZATION = "nlp_organization";
public static final String NLP_PERSON = "nlp_person";
public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse res, UpdateRequestProcessor next) {
return new StanbolContentProcessor(next);
}
class StanbolContentProcessor extends UpdateRequestProcessor {
public StanbolContentProcessor(UpdateRequestProcessor next) {
super(next);
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
SolrInputDocument doc = cmd.getSolrInputDocument();
String request = "";
for (String field : STANBOL_REQUEST_FIELDS) {
if (null != doc.getFieldValue(field)) {
request += (String) doc.getFieldValue(field) + ". ";
}
}
try {
EnhancementResult result = stanbolPost(request, getBaseURI());
Collection<TextAnnotation> textAnnotations = result
.getTextAnnotations();
// extracting text annotations
Set<String> personSet = new HashSet<String>();
Set<String> orgSet = new HashSet<String>();
for (TextAnnotation text : textAnnotations) {
String type = text.getType();
String selectedText = text.getSelectedText();
if (null != type && null != selectedText) {
if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
personSet.add(selectedText);
} else if (type
.equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
orgSet.add(selectedText);
}
}
}
for (String person : personSet) {
doc.addField(NLP_PERSON, person);
}
for (String org : orgSet) {
doc.addField(NLP_ORGANIZATION, org);
}
cmd.solrDoc = doc;
super.processAdd(cmd);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private EnhancementResult stanbolPost(String request, URI uri) {
Client client = Client.create();
WebResource webResource = client.resource(uri);
ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
.accept(new MediaType("application", "rdf+xml"))
.entity(request, MediaType.TEXT_PLAIN)
.post(ClientResponse.class);
int status = response.getStatus();
if (status != 200 && status != 201 && status != 202) {
throw new RuntimeException("Failed : HTTP error code : "
+ response.getStatus());
}
String output = response.getEntity(String.class);
// Parse the RDF model
Model model = ModelFactory.createDefaultModel();
StringReader reader = new StringReader(output);
model.read(reader, null);
return new EnhancementResult(model);
}
private static URI getBaseURI() {
return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
}
}
Я добавил класс здесь для reference..stanbolPost() метод отвечает за подключение к серверу Stanbol и возвращать результаты аксессуара, в то время как документ обновляется с новыми полями (nlp_person, nlp_organization) с помощью enhancementResult в методе processAdd(). –