Я новичок в способе разработки Java/Hibernate/Seam, но у меня возникла странная проблема с Hibernate и параллельными потоками.Seam Hibernate Служит одному экземпляру EntityManger для двух отдельных потоков
У меня есть компонент Scope, связанный с приложением, который выполняется через таймеры EJB с заданным интервалом (Orchestrator.java), вызывающим метод startProcessingWorkloads.
Этого метод имеет впрыскиваемый EntityManager, который он использует для проверки базы данных для сбора данных, и если он находит коллекцию работы она создает новый асинхронный Seam компонент (LoadContoller.java) и выполняет старта() метод на контроллере
LoadController имеет EntityManager впрыскиваемого и использовать его для выполнения очень большой сделки (около одного часа)
После того, как LoadController работает как отдельный поток, то Orchestrator все еще выполнен в виде нити на заданный интервал, например
1min
Orchestrator - выглядит для сбора работы (Ничего не найдено) (нить 1)
2мин
Orchestrator - выглядит для сбора работы (находит, начинает LoadController) (тема 1)
LoadController - Запуск обновления базы данных записей (нить 2)
3min
Orchestrator - Смотрит для сбора работ (None Found) (Тема 1)
LoadController - Тем не менее обновления записей базы данных (Тема 2)
4min
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)
LoadController - Тем не менее обновление записей базы данных (Тема 2)
5мин
Orchestrator - Похоже на работу коллекция (Ничего не найдено) (Тема 1)
LoadController - Готово обновление записей базы данных (поток 2)
6min
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)
7мина
Orchestrator - Смотрит для коллекции работ (Ничего не найдено) (Тема 1)
Однако я получаю Периодическую ошибку (см ниже) когда Orchestrator работает одновременно с LoadController.
5: 10: 40852 WARN [AbstractBatcher] исключения очистного maxRows/QueryTimeout java.sql.SQLException: Соединение не связанно с управляемым connection.org.jboss.resource.adapter.jdbc. JDK6.WrappedConnectionJDK6 @ 1fcdb21
брошена Эта ошибка после Orchestrator завершил Sql запроса и как LoadController пытается выполнить его следующий запрос.
Я сделал некоторые исследования. Я пришел к выводу, что EntityManager закрывается, поэтому LoadController не смог его использовать.
Теперь смущенный вопрос о том, что именно закрыло соединение, я сделал несколько базовых дампов объектов объекта управления сущностями, используемых Orchestrator и LoadController, когда каждый из компонентов вызван, и я обнаружил, что как раз перед тем, как получить полученную выше ошибку, случается.
2010-07-30 15: 06: 40804 ИНФО [processManagement.LoadController] (пул-15-нить-2) [email protected]
2010 -07-30 15: 10: 40758 ИНФО [processManagement.Orchestrator] (пул-15-нить-1) [email protected]
оказывается, что во время одного из исполнитель Orchestrator n интервалов, он получает ссылку на тот же EntityManager, который в настоящее время использует LoadController. Когда Orchestrator завершает выполнение SQL, он закрывает соединение, а LoadController больше не может выполнять свои обновления.
Так что я задаюсь вопросом, знает ли кто-нибудь об этом, или я получил свою резьбу, все испорченные в этом коде?
Из моего понимания, когда нагнетание EntityManager новый экземпляр впрыскивается из EntityManagerFactory, который остается с этим particualr объекта до объекта не оставляет сферу (в данном случае они являются лицами без гражданства, так когда старт() методы заканчивается), как мог один и тот же экземпляр менеджера сущности вводится в два отдельных потока?
Orchestrator.java
@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class Orchestrator {
//___________________________________________________________
@Logger Log log;
@In EntityManager entityManager;
@In LoadController loadController;
@In WorkloadManager workloadManager;
//___________________________________________________________
private int fProcessInstanceCount = 0;
//___________________________________________________________
public Orchestrator() {}
//___________________________________________________________
synchronized private void incrementProcessInstanceCount() {
fProcessInstanceCount++;
}
//___________________________________________________________
synchronized private void decreaseProcessInstanceCount() {
fProcessInstanceCount--;
}
//___________________________________________________________
@Observer("controllerExceptionEvent")
synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
decreaseProcessInstanceCount();
log.info(
"Controller " + String.valueOf(aProcess) +
" failed with the error [" + aException.getMessage() + "]"
);
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
aException,
Orchestrator.class
);
}
//___________________________________________________________
@Observer("controllerCompleteEvent")
synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
try {
MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
workloadManager.completeWorkload(completedWorklaod);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
decreaseProcessInstanceCount();
log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
}
//___________________________________________________________
@Asynchronous
public void startProcessingWorkloads(@IntervalDuration long interval) {
log.info("Polling for workloads.");
log.info(entityManager.toString());
try {
MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();
if (pendingWorkload != null) {
log.info(
"Pending Workload found (Workload_Id = " +
String.valueOf(pendingWorkload.getWorkloadId()) +
"), starting process controller."
);
Process aProcess = pendingWorkload.retriveProcessIdAsProcess();
ControllerIntf controller = createWorkloadController(aProcess);
if (controller != null) {
controller.start(aProcess, pendingWorkload.getWorkloadId());
workloadManager.setWorkloadProcessing(pendingWorkload);
}
}
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
ex,
Orchestrator.class
);
}
log.info("Polling complete.");
}
//___________________________________________________________
private ControllerIntf createWorkloadController(Process aProcess) {
ControllerIntf newController = null;
switch(aProcess) {
case LOAD:
newController = loadController;
break;
default:
log.info(
"createWorkloadController() does not know the value (" +
aProcess.name() +
") no controller will be started."
);
}
// If a new controller is created than increase the
// count of started controllers so that we know how
// many are running.
if (newController != null) {
incrementProcessInstanceCount();
}
return newController;
}
//___________________________________________________________
}
LoadController.java
@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
//__________________________________________________
@Logger private Log log;
@In private EntityManager entityManager;
//__________________________________________________
private String fFileName = "";
private String fNMDSFileName = "";
private String fAddtFileName = "";
//__________________________________________________
public LoadController(){ }
//__________________________________________________
@Asynchronous
synchronized public void start(Process aProcess, long aWorkloadId) {
log.info(
LoadController.class.getName() +
" process thread was started for WorkloadId [" +
String.valueOf(aWorkloadId) + "]."
);
log.info(entityManager.toString());
try {
Query aQuery = entityManager.createQuery(
"from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
);
MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();
fFileName =
misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
;
fNMDSFileName = "NMDS_" + fFileName;
fAddtFileName = "Addt_" + fFileName;
createDataFile(misLoadRecord.getFileContents());
ArrayList<String>sasCode = generateSASCode(
misLoadRecord.getLoadId(),
misLoadRecord.getMdSourceSystem().getPreloadFile()
);
//TODO: As the sas password will be encrypted in the database, we will
// need to decrypt it before passing to the below function
executeLoadSASCode(
sasCode,
misLoadRecord.getInitiatedBy(),
misLoadRecord.getSasPassword()
);
createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());
//TODO: Needs to remove password from DB when complete
removeTempCSVFiles();
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerCompleteEvent.name(),
aProcess,
aWorkloadId
);
log.info(LoadController.class.getName() + " process thread completed.");
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerExceptionEvent.name(),
aProcess,
ex
);
}
}
//__________________________________________________
private void createDataFile(byte[] aFileContent) throws Exception {
File dataFile =
new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);
FileUtils.writeBytesToFile(dataFile, aFileContent, true);
}
//__________________________________________________
private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
ArrayList<String> sasCode = new ArrayList<String>();
sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");
sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");
sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");
return sasCode;
}
//__________________________________________________
private void executeLoadSASCode(
ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception
{
SASExecutor aSASExecutor = new SASExecutor(
ECEConfig.getConfiguration().sas_server(),
ECEConfig.getConfiguration().sas_port(),
aUserName,
aPassword
);
aSASExecutor.execute(aSasCode);
log.info(aSASExecutor.getCompleteSasLog());
}
//__________________________________________________
/**
* Creates the MIS_UR_Workload_Contents records for
* the ECE Unit Record data that was just loaded
*
* @param aWorkloadId
* @param aMisLoadId
* @throws Exception
*/
private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {
String selectionRule =
" from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " +
String.valueOf(aMisLoadId)
;
MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
SeamManualTransaction manualTx = new SeamManualTransaction(
entityManager,
ECEConfig.getConfiguration().manualSeamTxTimeLimit()
);
manualTx.begin();
RecordPager oPager = new RecordPager(
entityManager,
selectionRule,
ECEConfig.getConfiguration().recordPagerDefaultPageSize()
);
Object nextRecord = null;
while ((nextRecord = oPager.getNextRecord()) != null) {
EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;
MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();
aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
aContentsRecord.setMisWorkload(misWorkload);
aContentsRecord.setProcessOutcome('C');
entityManager.persist(aContentsRecord);
}
manualTx.commit();
}
/**
* Removes the CSV temp files that are created for input
* into the SAS server and that are created as output.
*/
private void removeTempCSVFiles() {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);
if (dataInputCSV.exists()) {
dataInputCSV.delete();
}
if (nmdsOutputCSV.exists()) {
nmdsOutputCSV.delete();
}
if (addtOutputCSV.exists()) {
addtOutputCSV.delete();
}
}
}
SeamManualTransaction.java
public class SeamManualTransaction {
//___________________________________________________________
private boolean fObjectUsed = false;
private boolean fJoinExistingTransaction = true;
private int fTransactionTimeout = 60; // Default: 60 seconds
private UserTransaction fUserTx;
private EntityManager fEntityManager;
//___________________________________________________________
/**
* Set the transaction timeout in milliseconds (from minutes)
*
* @param aTimeoutInMins The number of minutes to keep the transaction active
*/
private void setTransactionTimeout(int aTimeoutInSecs) {
// 60 * aTimeoutInSecs = Timeout in Seconds
fTransactionTimeout = 60 * aTimeoutInSecs;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
*/
public SeamManualTransaction(EntityManager aEntityManager) {
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
setTransactionTimeout(aTimeoutInSecs);
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
* @param aJoinExistingTransaction
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
setTransactionTimeout(aTimeoutInSecs);
fJoinExistingTransaction = aJoinExistingTransaction;
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Starts the new transaction
*
* @throws Exception
*/
public void begin() throws Exception {
if (fObjectUsed) {
throw new Exception(
SeamManualTransaction.class.getCanonicalName() +
" has been used. Create new instance."
);
}
fUserTx =
(UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction");
fUserTx.setTransactionTimeout(fTransactionTimeout);
fUserTx.begin();
/* If entity manager is created before the transaction
* is started (ie. via Injection) then it must join the
* transaction
*/
if (fJoinExistingTransaction) {
fEntityManager.joinTransaction();
}
}
//___________________________________________________________
/**
* Commit the transaction to the database
*
* @throws Exception
*/
public void commit() throws Exception {
fObjectUsed = true;
fUserTx.commit();
}
// ___________________________________________________________
/** * Рулеты сделка назад * * @throws Exception */
общественности недействительным откат() бросает исключение { fObjectUsed = истина; fUserTx.откат(); }
// ___________________________________________________________ }
Спасибо, Пол, мне удалось найти основную причину этой проблемы, и да, это было частично связано с использованием EntityManager в области приложения. Я переработал код сейчас, так что класс Orchestrator вообще не использует EntityManager, но вместо этого EntityManagers используются в фазах без состояния. – Scott