2010-07-30 1 views
2

Я новичок в способе разработки Java/Hibernate/Seam, но у меня возникла странная проблема с Hibernate и параллельными потоками.Seam Hibernate Служит одному экземпляру EntityManger для двух отдельных потоков

У меня есть компонент Scope, связанный с приложением, который выполняется через таймеры EJB с заданным интервалом (Orchestrator.java), вызывающим метод startProcessingWorkloads.

Этого метод имеет впрыскиваемый EntityManager, который он использует для проверки базы данных для сбора данных, и если он находит коллекцию работы она создает новый асинхронный Seam компонент (LoadContoller.java) и выполняет старта() метод на контроллере

LoadController имеет EntityManager впрыскиваемого и использовать его для выполнения очень большой сделки (около одного часа)

После того, как LoadController работает как отдельный поток, то Orchestrator все еще выполнен в виде нити на заданный интервал, например

1min
Orchestrator - выглядит для сбора работы (Ничего не найдено) (нить 1)


2мин
Orchestrator - выглядит для сбора работы (находит, начинает LoadController) (тема 1)
LoadController - Запуск обновления базы данных записей (нить 2)


3min
Orchestrator - Смотрит для сбора работ (None Found) (Тема 1)
LoadController - Тем не менее обновления записей базы данных (Тема 2)

4min
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)
LoadController - Тем не менее обновление записей базы данных (Тема 2)


5мин
Orchestrator - Похоже на работу коллекция (Ничего не найдено) (Тема 1)
LoadController - Готово обновление записей базы данных (поток 2)


6min
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)
7мина
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)

Однако я получаю Периодическую ошибку (см ниже) когда Orchestrator работает одновременно с LoadController.

5: 10: 40852 WARN [AbstractBatcher] исключения очистного maxRows/QueryTimeout java.sql.SQLException: Соединение не связанно с управляемым connection.org.jboss.resource.adapter.jdbc. JDK6.WrappedConnectionJDK6 @ 1fcdb21

брошена Эта ошибка после Orchestrator завершил Sql запроса и как LoadController пытается выполнить его следующий запрос.

Я сделал некоторые исследования. Я пришел к выводу, что EntityManager закрывается, поэтому LoadController не смог его использовать.

Теперь смущенный вопрос о том, что именно закрыло соединение, я сделал несколько базовых дампов объектов объекта управления сущностями, используемых Orchestrator и LoadController, когда каждый из компонентов вызван, и я обнаружил, что как раз перед тем, как получить полученную выше ошибку, случается.

2010-07-30 15: 06: 40804 ИНФО [processManagement.LoadController] (пул-15-нить-2) [email protected]

2010 -07-30 15: 10: 40758 ИНФО [processManagement.Orchestrator] (пул-15-нить-1) [email protected]

оказывается, что во время одного из исполнитель Orchestrator n интервалов, он получает ссылку на тот же EntityManager, который в настоящее время использует LoadController. Когда Orchestrator завершает выполнение SQL, он закрывает соединение, а LoadController больше не может выполнять свои обновления.

Так что я задаюсь вопросом, знает ли кто-нибудь об этом, или я получил свою резьбу, все испорченные в этом коде?

Из моего понимания, когда нагнетание EntityManager новый экземпляр впрыскивается из EntityManagerFactory, который остается с этим particualr объекта до объекта не оставляет сферу (в данном случае они являются лицами без гражданства, так когда старт() методы заканчивается), как мог один и тот же экземпляр менеджера сущности вводится в два отдельных потока?

Orchestrator.java

@Name("processOrchestrator") 
@Scope(ScopeType.APPLICATION) 
@AutoCreate 
public class Orchestrator { 

    //___________________________________________________________ 

    @Logger Log log; 

    @In EntityManager entityManager; 

    @In LoadController loadController; 

    @In WorkloadManager workloadManager; 

    //___________________________________________________________ 

    private int fProcessInstanceCount = 0; 

    //___________________________________________________________ 

    public Orchestrator() {} 

    //___________________________________________________________ 

    synchronized private void incrementProcessInstanceCount() { 
    fProcessInstanceCount++; 
    } 

    //___________________________________________________________ 

    synchronized private void decreaseProcessInstanceCount() { 
    fProcessInstanceCount--; 
    } 

    //___________________________________________________________ 

    @Observer("controllerExceptionEvent") 
    synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) { 
    decreaseProcessInstanceCount(); 

    log.info(
     "Controller " + String.valueOf(aProcess) + 
     " failed with the error [" + aException.getMessage() + "]" 
    ); 

    Events.instance().raiseEvent(
     Application.ApplicationEvent.applicationExceptionEvent.name(), 
     aException, 
     Orchestrator.class 
    ); 
    } 

    //___________________________________________________________ 

    @Observer("controllerCompleteEvent") 
    synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) { 
    try { 
     MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId); 
     workloadManager.completeWorkload(completedWorklaod); 
    } catch (Exception ex) { 
     log.error(ex.getMessage(), ex); 
    } 

    decreaseProcessInstanceCount(); 

    log.info("Controller " + String.valueOf(aProcess) + " completed successfuly"); 
    } 

    //___________________________________________________________ 

    @Asynchronous 
    public void startProcessingWorkloads(@IntervalDuration long interval) { 
    log.info("Polling for workloads."); 
    log.info(entityManager.toString()); 
    try { 
     MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload(); 

     if (pendingWorkload != null) { 
     log.info(
      "Pending Workload found (Workload_Id = " + 
      String.valueOf(pendingWorkload.getWorkloadId()) + 
      "), starting process controller." 
     ); 

     Process aProcess = pendingWorkload.retriveProcessIdAsProcess(); 

     ControllerIntf controller = createWorkloadController(aProcess);   

     if (controller != null) { 
      controller.start(aProcess, pendingWorkload.getWorkloadId()); 
      workloadManager.setWorkloadProcessing(pendingWorkload); 
     } 
     } 

    } catch (Exception ex) { 
     Events.instance().raiseEvent(
     Application.ApplicationEvent.applicationExceptionEvent.name(), 
     ex, 
     Orchestrator.class 
    ); 
    } 

    log.info("Polling complete."); 
    } 

    //___________________________________________________________ 

    private ControllerIntf createWorkloadController(Process aProcess) { 
    ControllerIntf newController = null; 

    switch(aProcess) { 
     case LOAD: 
     newController = loadController; 
     break; 

     default: 
     log.info(
      "createWorkloadController() does not know the value (" + 
      aProcess.name() + 
      ") no controller will be started." 
     ); 
    } 

    // If a new controller is created than increase the 
    // count of started controllers so that we know how 
    // many are running. 
    if (newController != null) { 
     incrementProcessInstanceCount(); 
    } 

    return newController; 
    } 

    //___________________________________________________________ 

} 

LoadController.java

@Name("loadController") 
@Scope(ScopeType.STATELESS) 
@AutoCreate 
public class LoadController implements ControllerIntf { 
    //__________________________________________________ 

    @Logger private Log log; 

    @In private EntityManager entityManager; 

    //__________________________________________________ 

    private String fFileName = ""; 
    private String fNMDSFileName = ""; 
    private String fAddtFileName = ""; 

    //__________________________________________________ 

    public LoadController(){ } 
    //__________________________________________________ 

    @Asynchronous 
    synchronized public void start(Process aProcess, long aWorkloadId) { 
    log.info(
     LoadController.class.getName() + 
     " process thread was started for WorkloadId [" + 
     String.valueOf(aWorkloadId) + "]." 
    ); 
    log.info(entityManager.toString()); 
    try { 
     Query aQuery = entityManager.createQuery(
     "from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId) 
    ); 

     MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult(); 

     fFileName = 
     misLoadRecord.getInitiatedBy().toUpperCase() + "_" + 
     misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" + 
     DateUtils.now(DateUtils.FORMAT_FILE) + ".csv" 
     ; 

     fNMDSFileName = "NMDS_" + fFileName; 
     fAddtFileName = "Addt_" + fFileName; 

     createDataFile(misLoadRecord.getFileContents()); 

     ArrayList<String>sasCode = generateSASCode(
     misLoadRecord.getLoadId(), 
     misLoadRecord.getMdSourceSystem().getPreloadFile() 
    ); 

     //TODO: As the sas password will be encrypted in the database, we will 
     //  need to decrypt it before passing to the below function 
     executeLoadSASCode(
     sasCode, 
     misLoadRecord.getInitiatedBy(), 
     misLoadRecord.getSasPassword() 
    ); 

     createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId()); 

     //TODO: Needs to remove password from DB when complete 
     removeTempCSVFiles(); 

     Events.instance().raiseEvent(
     Application.ApplicationEvent.controllerCompleteEvent.name(), 
     aProcess, 
     aWorkloadId 
    ); 

     log.info(LoadController.class.getName() + " process thread completed."); 
    } catch (Exception ex) { 
     Events.instance().raiseEvent(
     Application.ApplicationEvent.controllerExceptionEvent.name(), 
     aProcess, 
     ex 
    ); 
    } 
    } 
    //__________________________________________________ 

    private void createDataFile(byte[] aFileContent) throws Exception { 
    File dataFile = 
     new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName); 

    FileUtils.writeBytesToFile(dataFile, aFileContent, true); 
    } 

    //__________________________________________________ 

    private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) { 
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir(); 
    ArrayList<String> sasCode = new ArrayList<String>(); 

    sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";"); 
    sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";"); 
    sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";"); 
    sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";"); 
    sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");  
    sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;"); 

    sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");"); 
    sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");"); 
    sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";"); 
    sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";"); 
    sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";"); 
    sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";"); 

    sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";"); 
    sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";"); 
    sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";"); 
    sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);"); 
    sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);"); 

    return sasCode; 
    } 

    //__________________________________________________ 

    private void executeLoadSASCode(
    ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception 
    { 
    SASExecutor aSASExecutor = new SASExecutor(
     ECEConfig.getConfiguration().sas_server(), 
     ECEConfig.getConfiguration().sas_port(), 
     aUserName, 
     aPassword 
    ); 

    aSASExecutor.execute(aSasCode); 

    log.info(aSASExecutor.getCompleteSasLog()); 
    } 
    //__________________________________________________ 

    /** 
    * Creates the MIS_UR_Workload_Contents records for 
    * the ECE Unit Record data that was just loaded 
    * 
    * @param aWorkloadId 
    * @param aMisLoadId 
    * @throws Exception 
    */ 

    private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception { 

    String selectionRule = 
     " from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " + 
     String.valueOf(aMisLoadId) 
    ; 
    MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId); 
    SeamManualTransaction manualTx = new SeamManualTransaction(
     entityManager, 
     ECEConfig.getConfiguration().manualSeamTxTimeLimit() 
    ); 
    manualTx.begin(); 
    RecordPager oPager = new RecordPager(
     entityManager, 
     selectionRule, 
     ECEConfig.getConfiguration().recordPagerDefaultPageSize() 
    ); 

    Object nextRecord = null; 

    while ((nextRecord = oPager.getNextRecord()) != null) { 
     EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord; 

     MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents(); 

     aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId()); 
     aContentsRecord.setMisWorkload(misWorkload); 
     aContentsRecord.setProcessOutcome('C'); 

     entityManager.persist(aContentsRecord); 
    } 

    manualTx.commit(); 
    } 

    /** 
    * Removes the CSV temp files that are created for input 
    * into the SAS server and that are created as output. 
    */ 

    private void removeTempCSVFiles() { 
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir(); 
    File dataInputCSV = new File(sasTempDir + "\\" + fFileName); 
    File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName); 
    File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName); 

    if (dataInputCSV.exists()) { 
     dataInputCSV.delete(); 
    } 
    if (nmdsOutputCSV.exists()) { 
     nmdsOutputCSV.delete(); 
    } 

    if (addtOutputCSV.exists()) { 
     addtOutputCSV.delete(); 
    } 
    } 
} 

SeamManualTransaction.java

public class SeamManualTransaction { 

    //___________________________________________________________ 

    private boolean fObjectUsed = false; 
    private boolean fJoinExistingTransaction = true; 
    private int fTransactionTimeout = 60; // Default: 60 seconds 
    private UserTransaction fUserTx; 
    private EntityManager fEntityManager; 

    //___________________________________________________________ 

    /** 
    * Set the transaction timeout in milliseconds (from minutes) 
    * 
    * @param aTimeoutInMins The number of minutes to keep the transaction active 
    */ 

    private void setTransactionTimeout(int aTimeoutInSecs) { 
    // 60 * aTimeoutInSecs = Timeout in Seconds 
    fTransactionTimeout = 60 * aTimeoutInSecs; 
    } 

    //___________________________________________________________ 

    /** 
    * Constructor 
    * 
    * @param aEntityManager 
    */ 

    public SeamManualTransaction(EntityManager aEntityManager) { 
    fEntityManager = aEntityManager; 
    } 

    //___________________________________________________________ 

    /** 
    * Constructor 
    * 
    * @param aEntityManager 
    * @param aTimeoutInSecs 
    */ 

    public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) { 
    setTransactionTimeout(aTimeoutInSecs); 
    fEntityManager = aEntityManager; 
    } 

    //___________________________________________________________ 

    /** 
    * Constructor 
    * 
    * @param aEntityManager 
    * @param aTimeoutInSecs 
    * @param aJoinExistingTransaction 
    */ 
    public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) { 
    setTransactionTimeout(aTimeoutInSecs); 
    fJoinExistingTransaction = aJoinExistingTransaction; 
    fEntityManager = aEntityManager; 
    } 

    //___________________________________________________________ 

    /** 
    * Starts the new transaction 
    * 
    * @throws Exception 
    */ 

    public void begin() throws Exception { 
    if (fObjectUsed) { 
     throw new Exception(
     SeamManualTransaction.class.getCanonicalName() + 
     " has been used. Create new instance." 
    ); 
    } 

    fUserTx = 
     (UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction"); 

    fUserTx.setTransactionTimeout(fTransactionTimeout); 
    fUserTx.begin(); 

    /* If entity manager is created before the transaction 
    * is started (ie. via Injection) then it must join the 
    * transaction 
    */ 
    if (fJoinExistingTransaction) { 
     fEntityManager.joinTransaction(); 
    } 
    } 

    //___________________________________________________________ 

    /** 
    * Commit the transaction to the database 
    * 
    * @throws Exception 
    */ 

    public void commit() throws Exception { 
    fObjectUsed = true; 
    fUserTx.commit(); 
    } 

// ___________________________________________________________

/** * Рулеты сделка назад * * @throws Exception */

общественности недействительным откат() бросает исключение { fObjectUsed = истина; fUserTx.откат(); }

// ___________________________________________________________ }

ответ

0

В общем, нагнетании EntityManager в пластах компоненте области видимости приложения не является правильным. Диспетчер объектов - это то, что вы создаете, используете и закрываете снова, в области, которая обычно намного короче, чем область APPLICATION.

Улучшение путем выбора небольших областей с помощью стандартной сущности entityManager или если вам нужна область применения, вместо этого введи EntityManagerFactory и создайте, используйте и закройте entityManager самостоятельно.

Посмотрите на свой сайт Seam Components.xml, чтобы найти имя вашего компонента EntityManagerFactory.

+0

Спасибо, Пол, мне удалось найти основную причину этой проблемы, и да, это было частично связано с использованием EntityManager в области приложения. Я переработал код сейчас, так что класс Orchestrator вообще не использует EntityManager, но вместо этого EntityManagers используются в фазах без состояния. – Scott

0

Ну, мой первый совет является

Если вы используете приложение EJB, предпочитают использовать Bean Managed Transaction вместо пользовательских SeamManualTransaction. Когда вы используете транзакцию, управляемую Bean, вы, как разработчик, заботитесь о вызове begin и commit. Вы получаете эту функцию с помощью компонента UserTransaction. Вы можете создать слой «Фасад», который начинается и совершает транзакцию. Что-то вроде

/** 
    * default scope when using @Stateless session bean is ScopeType.STATELESS 
    * 
    * So you do not need to declare @Scope(ScopeType.STATELESS) anymore 
    * 
    * A session bean can not use both BEAN and CONTAINER Transaction management at The same Time 
    */ 
@Stateless 
@Name("businessFacade") 
@TransactionManagement(TransactionManagerType.BEAN) 
public class BusinessFacade implements BusinessFacadeLocal { 

    private @Resource TimerService timerService; 
    private @Resource UserTransaction userTransaction; 
    /** 
     * You can use @In of you are using Seam capabilities 
     */ 
    private @PersistenceContext entityManager; 

    public void doSomething() { 
     try { 
      userTransaction.begin(); 
      userTransaction.setTransactionTimeout(int seconds); 

      // business logic goes here 

      /** 
       * To enable your Timer service, just call 
       * 
       * timerService.createTimer(15*60*1000, 15*60*1000, <ANY_SERIALIZABLE_INFO_GOES_HERE>); 
       */ 

      userTransaction.commit(); 
     } catch (Exception e) { 
      userTransaction.rollback(); 
     } 
    } 

    @Timeout 
    public void doTimer(Timer timer) { 
     try { 
      userTransaction.begin();   

      timer.getInfo(); 

      // logic goes here 

      userTransaction.commit(); 
     } catch (Exception e) { 
      userTransaction.rollback(); 
     } 
    } 

} 

Давайте посмотрим UserTransaction.begin метод API

Создать новую транзакцию и ассоциировать его с текущим потоком

Существует еще:

Время жизни контекст персистентности управляемого контейнера (впрыскивается через @PersistenceContext аннотации) соответствует в рамки транзакции (между началом и фиксацией вызова метода) при использовании транзакций в области видимости контекстого сохранения

сейчас Давайте посмотрим TimerService

это контейнер, предоставляемый сервис, который позволяет корпоративные компоненты должны быть зарегистрированы для методов таймера обратного вызова происходит при определенное время, через указанное истекшее время или через определенные промежутки времени. Класс bean-компонента корпоративного компонента , который использует таймер , должен предоставить метод обратного вызова времени ожидания. Таймеры может быть созданы для сеансных, управляемых сообщений бобов

Я надеюсь, что это может быть полезным для вас

+0

Спасибо Артур. Ваш пост заставил меня сделать одно, перечитать мой код. В методе LoadController.createWorkloadContentRecords() я обнаружил SeamManualTransaction, который не требуется из-за рефакторинга. Я удалил это, и теперь мой вопрос пропал (пальцы скрещены).Однако представляется странным, что один и тот же экземпляр может быть введен в два потока, пока он не используется. – Scott