2015-03-17 3 views
0

Я создал простую партию, используя реализацию Spring Batch JSR-352. Пакет использует PartitionMapper для ввода отдельного свойства для каждого фрагмента. В соответствии со спецификациями JSR-352 (когда partitionsOverride = False), когда один из фрагментов раздела выходит из строя, и пакет перезагружается только, поврежденные фрагменты раздела должны перезапускаться.Spring Batch JSR-352 перезапускает неправильные разделы

Например, если у нас есть 3 раздела: partition0, partition1 и partition2. Если разделы partition1 и partition2 не работают, пакет должен только перезапустить разделы1 и partion2 со своими собственными свойствами пакета.

Однако я заметил, что при использовании реализации Spring Batch JSR-352 (последняя версия 3.0.3.Release) перезапуск пакета перезапустит разделы0 и partition1 вместо partition1 и partition2. Таким образом, он правильно обнаруживает, что два раздела потерпели неудачу, но он неправильно перезапускает первые (два) раздела вместо неудачных разделов.

Является ли это ошибкой в ​​реализации Spring Batch или я что-то упускаю?

См JSR-352 Документах раздел 10.8.5: http://download.oracle.com/otn-pub/jcp/batch-1_0_revA-mrel-spec/JSR_352-v1.0_Rev_a-Maintenance_Release.pdf

Вот код, который я использовал:

/META-INF/batch-jobs/sampleBatch.xml

<?xml version="1.0" encoding="UTF-8"?> 
<job id="sampleBatch" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"> 

<step id="sampleStep"> 
    <chunk item-count="100"> 
     <reader ref="com.springapp.batch.SampleReader"> 
      <properties> 
       <property name="sample" value="#{partitionPlan['sample']}"/> 
      </properties> 
     </reader> 
     <writer ref="com.springapp.batch.SampleWriter"> 
      <properties> 
       <property name="sample" value="#{partitionPlan['sample']}"/> 
      </properties> 
     </writer> 
    </chunk> 
    <partition> 
     <mapper ref="com.springapp.batch.SamplePartitionMapper"/> 
    </partition> 
</step> 
</job> 

com.springapp.batch.SamplePartitionMapper:

package com.springapp.batch; 

import java.util.Properties; 
import javax.batch.api.partition.PartitionMapper; 
import javax.batch.api.partition.PartitionPlan; 
import javax.batch.api.partition.PartitionPlanImpl; 

public class SamplePartitionMapper implements PartitionMapper { 

@Override 
public PartitionPlan mapPartitions() throws Exception { 
    final PartitionPlan partitionPlan = new PartitionPlanImpl(); 

    int size = 3; 
    Properties[] partitionProps = new Properties[size]; 

    for (int i=0; i<size; i++) { 
     final Properties properties = new Properties(); 
     properties.put("sample", ""+i); 
     partitionProps[i] = properties; 
     System.out.println("mapPartitions: " + i); 
    } 

    partitionPlan.setThreads(1); 
    partitionPlan.setPartitions(partitionProps.length); 
    partitionPlan.setPartitionProperties(partitionProps); 

    return partitionPlan; 
} 
} 

com.springapp.batch.SampleReader:

public class SampleReader extends AbstractItemReader { 

@Inject 
@BatchProperty 
private String sample; 

Iterator<Integer> iter; 

@Override 
public void open(Serializable checkpoint) throws Exception { 
    System.out.println("open for reading sample: " + sample); 
    ArrayList list = new ArrayList<Integer>(); 
    for(int i=0; i<Integer.parseInt(sample); i++) { 
     list.add(new Integer(i)); 
    } 
    iter = list.iterator(); 
} 

@Override 
public Integer readItem() throws Exception { 
    if(iter.hasNext()) 
     return iter.next(); 
    else 
     return null; 
} 
} 

com.springapp.batch.SampleWriter:

public class SampleWriter extends AbstractItemWriter { 

@Inject 
@BatchProperty 
private String sample; 

@Override 
public void writeItems(List<Object> items) throws Exception { 
    System.out.println("writeItems sample: " + sample); 

    if(sample.equals("1")) { 
     throw new Exception("FAIL PARTITION 1"); 
    } 
    if(sample.equals("2")) { 
     throw new Exception("FAIL PARTITION 2"); 
    } 

    for (Object itemObj : items) { 
     Integer item = (Integer) itemObj; 
     System.out.println(item); 
    } 
} 
} 

TestJob TestRunner:

package com.springapp.batch; 

import java.util.Properties; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

import javax.batch.operations.JobOperator; 
import javax.batch.runtime.BatchRuntime; 
import javax.batch.runtime.JobExecution; 
import javax.inject.Inject; 

import junit.framework.Assert; 
import org.junit.Test; 

//@RunWith(SpringJUnit4ClassRunner.class) 
//@ContextConfiguration("classpath:spring-config.xml") 
public class AppTests { 

@Test 
public void testJob() throws Exception { 
    JobOperator jobOperator = BatchRuntime.getJobOperator(); 
    long jobExecution = jobOperator.start("sampleBatch", new Properties()); 

    int attempt = 0; 
    while (true) { 
     JobExecution execution = jobOperator.getJobExecution(jobExecution); 
     if (execution.getEndTime() != null) { 
      //check status 
      if("FAILED".equals(execution.getExitStatus()) && attempt < 3) { 
       attempt++; 
       System.out.println("Batch failed, trying to restart (attempt " + attempt + ").."); 
       jobExecution = jobOperator.restart(jobExecution, new Properties()); 
       continue; 
      } 
      System.out.println("Batch ended with status: " + execution.getExitStatus()); 
      break; 
     } 
    } 
    Assert.assertEquals("COMPLETED", jobOperator.getJobExecution(jobExecution).getExitStatus()); 
} 
} 

ответ

0

Глядя на это, похоже, ошибка , Я зарегистрировал Jira issue BATCH-2364, чтобы отслеживать его.