2015-09-11 1 views
0

Глядя на этот поток ...Почему я могу читать/обрабатывать только один файл из SI MessageSource?

public Date nextExecutionTime(TriggerContext triggerContext) { 
    return this.invoked.getAndSet(true) ? null : new Date(); 
} 

@Bean 
public IntegrationFlow mainFlow() { 
    JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper); 
    // @formatter:off 
    return IntegrationFlows 
      .from(
        amazonS3InboundSynchronizationMessageSource(), 
        e -> e.poller(p -> p.trigger(this::nextExecutionTime)) 
      ) 
      .channel(LoggingUtils.createLoggingMessageChannel("File:::")) 
      .transform(new FileToInputStreamTransformer()) 
      .split(new FileSplitter(), null) 
      .channel(c -> c.executor(Executors.newFixedThreadPool(10))) 
      .transform(Transformers.fromJson(persistentType(), jsonObjectMapper)) 
      .handle(LoggingUtils.createLoggingMessageHandler("Parsed JSON record #")) 
      //.handle(jdbcRepositoryHandler()) 
      //.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
      .get(); 
    // @formatter:on 
}  

Почему я только смог прочесть один файл?

Даже если сконфигурированный MessageSource (a AmazonS3InboundSynchronizationMessageSource) записывает более одного файла в локальный каталог.

выходного Пример консоль

 
2015-09-11 09:52:59,856 [task-scheduler-1] org.springframework.integration.aws.s3.InboundFileSynchronizationImpl INFO Sync completed 
2015-09-11 09:52:59,860 [task-scheduler-1] org.springframework.integration.handler.LoggingHandler INFO Event: [File:::] - Message: [GenericMessage [payload=/Users/cphi/development/projects/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz, headers={id=e58c332b-c217-8059-c4e8-09bba2c430a0, timestamp=1441990379859}]] 
2015-09-11 09:52:59,918 [pool-2-thread-8] org.springframework.integration.handler.LoggingHandler INFO Event: [Parsed JSON record #] - Message: [GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=,ratePlanId=201744463,stayDate=Wed Sep 02 17:00:00 PDT 2015,ratePlanLevel=0,hotelId=4469515,rprLogSeqNum=16,logActionTypeId=2,sellStateId=1,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged=,supplierUpdateDate=Sat Aug 22 07:57:24 PDT 2015,supplierUpdateTuid=68630676,createDate=Sat Aug 22 07:57:24 PDT 2015,changeRequestId=31461011173,changeRequestSourceId=], headers={sequenceNumber=8, file_name=2015-08-22-23-58-0.302402118982895.gz, sequenceSize=0, correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510, file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz, id=6b866d25-07e8-22a4-381c-d26205393f3b, timestamp=1441990379898}]] 
2015-09-11 09:52:59,919 [pool-2-thread-3] org.springframework.integration.handler.LoggingHandler INFO Event: [Parsed JSON record #] - Message: [GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=,ratePlanId=1030513,stayDate=Wed Aug 26 17:00:00 PDT 2015,ratePlanLevel=0,hotelId=1615126,rprLogSeqNum=6,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged=,supplierUpdateDate=Sat Aug 22 07:57:35 PDT 2015,supplierUpdateTuid=46712703,createDate=Sat Aug 22 07:57:35 PDT 2015,changeRequestId=31461014045,changeRequestSourceId=], headers={sequenceNumber=3, file_name=2015-08-22-23-58-0.302402118982895.gz, sequenceSize=0, correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510, file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz, id=ddf1ee98-55c4-81de-af77-a886a340fe07, timestamp=1441990379897}]] 
2015-09-11 09:52:59,919 [pool-2-thread-2] org.springframework.integration.handler.LoggingHandler INFO Event: [Parsed JSON record #] - Message: [GenericMessage [payload=RatePlanLevelRestrictionLog[roomTypeId=,ratePlanId=163007,stayDate=Fri Dec 11 16:00:00 PST 2015,ratePlanLevel=0,hotelId=897973,rprLogSeqNum=3,logActionTypeId=2,sellStateId=0,startAllowed=,endAllowed=,fplosMaskArrival=,fplosMaskStayThrough=,doaCostPriceChanged=,supplierUpdateDate=Sat Aug 22 07:57:16 PDT 2015,supplierUpdateTuid=46712703,createDate=Sat Aug 22 07:57:16 PDT 2015,changeRequestId=31461009374,changeRequestSourceId=], headers={sequenceNumber=2, file_name=2015-08-22-23-58-0.302402118982895.gz, sequenceSize=0, correlationId=16e44a80-2669-b2bf-f2bf-f12fe6bb4510, file_originalFile=/Users/cphi/development/projects/expedia/git/luis-data-migration-service/target/s3-dump/RatePlanLevelRestrictionLog/2015/08/23/00/2015-08-22-23-58-0.302402118982895.gz, id=d7d7a418-6593-bc57-fc7d-e181778be0c8, timestamp=1441990379899}]] 
...    

содержимого каталог

.../target/s3-dump/RatePlanLevelRestriction 
+- 2015 
+-- 08 
+--- 23 
+---- 00 
+----- 2015-08-22-23-58-0.302402118982895.gz 
+----- 2015-08-22-23-58-0.302992661055088.gz 
+----- 2015-08-22-23-58-0.303107496339691.gz                                

Если вам интересно, вот г для:

ответ

0

Увеличение maxMessagesPerPoll на Poller (по умолчанию 1).

+0

Если я это сделаю; как бы я знал, какими должны быть максимальные сообщения? Каждый синхронизированный каталог может содержать вариантное количество файлов. Думаю, вы устанавливаете ограничение количества файлов, которые могут быть обработаны в одном опросе? Я попробовал, и он обработал более 1 файл, но я все еще пытаюсь примирить, если я слишком сильно уставил «maxMessagesPerPoll». У меня все еще была проблема с оригиналом? Я также попробовал 'p.trigger (Pollers.fixedDelay (2000)' и увидел, что одни и те же файлы были опрошены, но что они не были обработаны нисходящим потоком b/c в том, как 'InboundFileSynchronizer' работает внутри S3' MessageSource' impl. –

+0

Вам просто нужно установить 'mmpp' достаточно большой (или' -1' для неограниченного - значит, продолжать обрабатывать файлы до тех пор, пока не будут обнаружены новые). –

+0

Спасибо, Гэри! Но не должно быть '-1' по умолчанию? –