2015-04-21 5 views
12

Я пытаюсь передать безопасные параметры из tasklet на шаг в той же работе.Как безопасно передавать параметры от Tasklet до шага при выполнении параллельных заданий

Моя работа состоит 3 тасклетов (step1, step2, Step3) один за другим и в конце концов step4 (процессор, читатель, писатель)

эта работа ведется во много раз выполняются параллельно.

В step1 внутри тасклет Я подсчитываю пары (Хашид) с помощью веб-сервиса), чем я передаю все это по моей цепи до моего читателя (который на шаге 4)

На шаге 3 Я создаю новый параметр вызванный: filePath, который основан на hashid, и я отправляю его на step4 (читатель) в качестве местоположения файлового ресурса

Я использую stepExecution для передачи этого параметра (hashId и filePath).

Я попытался 3 способа делать это с помощью тасклета:

скоротать параметры (Хашид из step1 в step2 и от step2 в шаг 3) Я делаю это:

chunkContext.getStepContext() 
     .getStepExecution() 
     .getExecutionContext() 
     .put("hashId", hashId); 

In step4 I я заселение Filepath на основе Хашида и передать его таким образом, чтобы мой последний шаг (который является процессор читателя и писателем)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener { 
.. 

    @Override 
    public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext  
    executionContext) throws IOException { 

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId"); 

      ... 

filepath="...hashId.csv"; 
//I used here executionContextPromotionListener in order to promote those keys 

     chunkContext.getStepContext() 
     .getStepExecution() 
     .getExecutionContext() 
     .put("filePath", filePath); 
    } 

logger.info("filePath + "for hashId=" + hashId); 

} 
@Override 
public void beforeStep(StepExecution stepExecution) { 
    this.stepExecution = stepExecution; 
} 

Обратите внимание, что я печать значения Хашида и Filepath права быть Я закончил этот шаг (шаг 3). по логике они согласованы и заселены, как ожидалось

Я также добавил журналы в своем читателе, чтобы просмотреть параметры, которые я получаю.

@Bean 
    @StepScope 
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) { 
       logger.info("test filePath="+filePath+"); 

     return itemReader; 
    } 

Когда я выполняю эту работу ~ 10 раз я могу видеть, что значение парам Filepath заполняется другими заданиями Filepath значений при выполнении параллельного

Это, как я продвигаю ключи о задании с executionContextPromotionListener:

определение

работы:

@Bean 
    public Job processFileJob() throws Exception { 
     return this.jobs.get("processFileJob"). 
       start.(step1). 
       next(step2) 
       next(downloadFileTaskletStep()). //step3 
       next(processSnidFileStep()).build(); //step4 

    } 

шаг 3 определения

public Step downloadFileTaskletStep() { 
     return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build(); 
    } 


    @Bean 
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() { 
     ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener(); 
     executionContextPromotionListener.setKeys(new String[]{"filePath"}); 
     return executionContextPromotionListener; 
    } 

Одинаковые результаты нити Мессинг в Params

я могу отслеживать результаты через таблицу базы данных весной пакетном: batch_job_execution_context.short_context:

здесь вы можете увидеть на filePatch, который построен по Хашид не совпадает в начало hashId // неправильная запись ///

{"map": [{"entry": [{"string": "totalRecords", "int": 5}, {"string": " segmentId "," long ": 13}, {" string ": [" filePath ","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.CSV "]}, {" строка ": [" Хашид "" 20df39d201fffc7444423cfdf2f43789 «]}]}]}

Теперь, если мы проверяем другие записи, которые они, кажется, хорошо, но всегда один или два перепутались

.

// правильные записи

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]} 

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]} 

Любая идея, почему у меня есть эти резьбонарезной вопросы

+0

Как именно вы используете свою «работу» одновременно? Каждое задание будет иметь отдельный контекст, поэтому параметры будут уникальными для каждого выполнения задания. – Palcente

+0

Я просто выполняю одну и ту же работу много раз сразу через jobLauncher. Исходные параметры безопасны. но я создаю новый параметр (filePath) внутри таска, и я должен отправить его моему читателю, который на следующем шаге. и это значение несовместимо, когда я выполняю много заданий параллельно. как не потокобезопасный – rayman

+0

просто из любопытства попробуйте '@Bean @Scope (« prototype »)' – Palcente

ответ

2

Чтобы просмотреть пытавшихся методы:

  • Способ 1 - Редактирование JobParameters JobParameters неизменяемы в задании, поэтому попытка их изменения во время выполнения задания не должна предприниматься.
  • Способ 2 - Редактирование JobParameters v2 Метод 2 действительно такой же, как метод 1, вы только собираетесь получить ссылку на JobParameters по-другому.
  • Способ 3 - Использование ExecutionContextPromotionListener. Это правильный путь, но вы делаете неправильно. ExecutionContextPromotionListener смотрит на шагExecutionContext и копирует указанные ключи в ExecutionContext. Вы добавляете ключи непосредственно в Job ExecutionContext, что является плохой идеей.

Короче говоря, метод 3 является самым близким, чтобы исправить, но вы должны добавлять свойства, которые вы хотите поделиться на СТЭП ExecutionContext, а затем настроить ExecutionContextPromotionListener продвигать соответствующие ключи от Иова ExecutionContext.

код будет обновляться следующим образом:

chunkContext.getStepContext() 
      .getStepExecution() 
      .getExecutionContext() 
      .put("filePath", filePath); 
+0

@ Спасибо за то, что дали мне преимущество! просто одно уточнение: «вы должны добавлять свойства, которые хотите поделиться с ExecutionContext шага». Не уверен, что я понял, как реализовать ваше предложение. Я настраиваю ExecutionContextPromotionListener с помощью ключей и добавляет его в качестве слушателя в файл downloadFileTaskletStep. Как бы вы изменили эту реализацию? Спасибо!! – rayman

+0

Так что я должен использовать chunkContext, а не stepExecution, который я инициализировал на предыдущем этапе – rayman

+0

Привет, Майкл, я попробовал. Я выполнял эти задания 14 раз подряд, и я вижу, что одна запись непоследовательна. это означает, что внутри tasklet я печатаю hashid = x, и когда я добираюсь до шага (внутри одной и той же работы), я печатаю hashid = y (другого выполнения задания). Я могу отслеживать это через таблицу базы данных: batch_job_execution_context и coulmn short_context – rayman