3

Я уверен, что соединения в режиме ожидания не используются повторно или я испытываю утечку соединений. У меня есть простой маршрут, который начинается с пользователя файла. Потребитель файлов потребляет текстовые файлы. После сбора файла я проверяю таблицу, чтобы убедиться, что это не дубликат файла.DBCP Idle Connections не используется повторно в Camel Route

Затем я преобразую тело сообщения из файла в строку. Затем я разбил файл и запускал отдельные фрагменты по маршруту в зависимости от типа записи. Каждый из этих маршрутов в конечном итоге вставляет эту запись в промежуточную таблицу на сервере, работающем на MySQL.

Ниже представлена ​​упрощенная версия маршрута.

<bean id="myPool" class="java.util.concurrent.Executors" factory-method="newFixedThreadPool"> 
     <argument index="0" value="8"/> 
</bean> 
<camelContext trace="false" handleFault="true" errorHandlerRef="redeliveryErrorHandler" id="FileETLProcess" xmlns="http://camel.apache.org/schema/blueprint"> 
<errorHandler type="DefaultErrorHandler" useOriginalMessage="true" id="redeliveryErrorHandler"> 
    <redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="25" backOffMultiplier="2" useExponentialBackOff="false" retryAttemptedLogLevel="INFO"/> 
</errorHandler> 
<onException useOriginalMessage="true"> 
    <exception>java.lang.Exception</exception> 
    <handled> 
     <constant>true</constant> 
    </handled> 
    <log message="STARTING ERROR GENERAL HANDLER"/> 
    <bean ref="GeneralError"/> 
    <to uri="smtp://[email protected]&amp;[email protected]&amp;subject=GENERAL ERROR: A File Could Not Be Imported&amp;contentType=text/html"/> 
    <to uri="file:d:/Inbox/.badFile?fileName=${file:onlyname.noext}_GENERALERROR_${date:now:yyyyMMddHHmmss}.${file:name.ext}"/> 
</onException> 
<route id="ExtractFileRoute"> 
    <from uri="file:d:/Inbox?delay=10000&amp;move=.donebackup/${date:now:yyyyMMdd}/${file:onlyname.noext}_DONE_${date:now:yyyyMMddHHmmss}.${file:name.ext}&amp;readLock=changed&amp;include=.*.dl&amp;maxMessagesPerPoll=0&amp;sortBy=${file:length}"/> 
    <bean ref="FileCheck"/> 
    <choice> 
     <when> 
      <simple>${header.ACCEPTEDFILE} == 'YES'</simple> 
      <log message="File Import Route Started At:${date:now:yyyy-MM-dd HH:mm:ss}"/> 
      <convertBodyTo type="java.lang.String"/> 
      <log message="Converted File To String:${date:now:yyyy-MM-dd HH:mm:ss} handing data to File To DB route."/> 
      <split parallelProcessing="true" executorServiceRef="myPool" streaming="true" shareUnitOfWork="true"> 
       <tokenize token="\n"></tokenize> 
       <setHeader headerName="SPLITFINISHED"> 
        <simple>${property.CamelSplitComplete}</simple> 
       </setHeader> 
       <setHeader headerName="SPLITNUMBER"> 
        <simple>${property.CamelSplitIndex}</simple> 
       </setHeader> 
       <bean ref="EnrichHeader"/> 
       <choice> 
        <when> 
         <simple>${header.RECORDTYPE} == 'HEADER'</simple> 
         <doTry> 
          <unmarshal ref="bindyHeader"/> 
          <bean ref="HeaderPersist"/> 
          <choice> 
           <when> 
            <simple>${property.CamelSplitComplete} == true</simple> 
            <to uri="direct:auxrecordsmove"/> 
           </when> 
          </choice> 
          <doCatch> 
           <exception>java.lang.Exception</exception> 
           <handled> 
            <constant>true</constant> 
           </handled> 
           <bean ref="RecordErrorReport"/> 
           <choice> 
            <when> 
             <simple>${property.CamelSplitComplete} == true</simple> 
             <to uri="direct:auxrecordsmove"/> 
            </when> 
           </choice> 
          </doCatch> 
         </doTry> 
        </when> 
        <when> 
         <simple>${header.RECORDTYPE} == 'A'</simple> 
         <doTry> 
          <unmarshal ref="bindyAccount"/> 
          <bean ref="AccountPersist"/> 
          <choice> 
           <when> 
            <simple>${property.CamelSplitComplete} == true</simple> 
            <to uri="direct:auxrecordsmove"/> 
           </when> 
          </choice> 
          <doCatch> 
           <exception>java.lang.Exception</exception> 
           <handled> 
            <constant>true</constant> 
           </handled> 
           <bean ref="RecordErrorReport"/> 
           <choice> 
            <when> 
             <simple>${property.CamelSplitComplete} == true</simple> 
             <to uri="direct:auxrecordsmove"/> 
            </when> 
           </choice> 
          </doCatch> 
         </doTry> 
        </when> 
        <when> 
         <simple>${header.RECORDTYPE} == 'C'</simple> 
         <doTry> 
          <unmarshal ref="bindyComaker"/> 
          <bean ref="CoMakerPersist"/> 
          <choice> 
           <when> 
            <simple>${property.CamelSplitComplete} == true</simple> 
            <to uri="direct:auxrecordsmove"/> 
           </when> 
          </choice> 
          <doCatch> 
           <exception>java.lang.Exception</exception> 
           <handled> 
            <constant>true</constant> 
           </handled> 
           <bean ref="RecordErrorReport"/> 
           <choice> 
            <when> 
             <simple>${property.CamelSplitComplete} == true</simple> 
             <to uri="direct:auxrecordsmove"/> 
            </when> 
           </choice> 
          </doCatch> 
         </doTry> 
        </when> 
        Some other beans here........ 
        <when> 
         <simple>${property.CamelSplitComplete} == true</simple> 
         <to uri="direct:auxrecordsmove"/> 
        </when> 
        <otherwise> 
         <to uri="smtp://[email protected]&amp;[email protected]&amp;subject=URGENT:UNKOWN RECORD TYPE FOUND IN FILE"/> 
         <choice> 
          <when> 
           <simple>${property.CamelSplitComplete} == true</simple> 
           <to uri="direct:auxrecordsmove"/> 
          </when> 
         </choice> 
        </otherwise> 
       </choice> 
      </split> 
     </when> 
     <otherwise> 
      <to uri="file:d:/RMSInbox/.badFile?fileName=${file:onlyname.noext}_POSSIBLE_DUPLICATE_ERROR_${date:now:yyyyMMddHHmmss}.${file:name.ext}"/> 
      <bean ref="FileErrorReport"/> 
      <to uri="smtp://[email protected]&amp;[email protected]&amp;subject=ERROR: A File Could Not Be Imported&amp;contentType=text/html"/> 
     </otherwise> 
    </choice> 
</route> 

Таким образом, каждое сообщение на этом пути в конечном итоге попадает в компонент, который будет вставлять его в базу данных. Поэтому я добавил ГСБД зависимостей, а затем объявить его в моем OSGi XML план следующим образом:

<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://Canttouchthis:3306/ETLDB"/> 
    <property name="username" value="ETLUser"/> 
    <property name="password" value="password"/> 
    <property name="initialSize" value="2"/> 
    <property name="maxActive" value="16"/> 
    <property name="maxIdle" value="16"/> 
    <property name="minIdle" value="2"/> 
    <property name="timeBetweenEvictionRunsMillis" value="180000"/> 
    <property name="minEvictableIdleTimeMillis" value="180000"/> 
    <property name="testOnBorrow" value="true"/> 
    <property name="testWhileIdle" value="true"/> 
    <property name="testOnReturn" value="true"/> 
    <property name="validationQuery" value="SELECT 1"/> 
    <property name="maxWait" value="10000"/> 
    <property name="removeAbandoned" value="true"/> 
    <property name="logAbandoned" value="false"/> 
    <property name="removeAbandonedTimeout" value="300"/> 
</bean> 

Я также заявляю, мои бобы, которые будут делать обработку, как это:

<bean id="AccountPersist" class="com.foo.NewAccount.AccountInformationToDatabase"> 
    <property name="dataSource" ref="dataSource"/> 
</bean> 

Теперь, когда раскол по файлу закончен, я хочу убедиться, что записи совпадают. В основном файл имеет записи учетной записи и некоторую вспомогательную информацию. Поэтому я проверяю маршруты, когда раскол закончен, а затем, как только файл полностью находится в промежуточных таблицах, я запускаю дополнительные проверки работоспособности в MySQL.

Это второй маршрут выглядит примерно так:

<route trace="false" id="MoveMatchingAuxRecordsFromStage"> 
    <from uri="direct:auxrecordsmove"/> 
    <log message="File Import Route Ended At:${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="ETL Route Start AT: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="Moving Matching Comaker records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <bean ref="CoMakerETL"/> 
    <log message="Matching Comaker records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="Moving Matching Credit History records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <bean ref="CreditHistoryETL"/> 
    <log message="Matching Credit History records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="Moving Matching Extra Detail records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <bean ref="ExtraDetailInformationETL"/> 
    <log message="Matching Extra Detail records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="Moving Legal Information records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 

    <bean ref="LegalInformationETL"/> 
    <log message="Matching Legal Information records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/> 
    <log message="ETL Route Finished At ${date:now:yyyy-MM-dd HH:mm:ss}"/> 

</route> 

Так что в моем тестировании все шло отлично, я мог бы импортировать файл тихий эффективно, как это. Мои проблемы возникли, когда я разместил в папке более 5 файлов. В основном я наблюдаю, как MySQL расширяет пул соединений до максимального размера, а затем не повторно использует соединения.

Таким образом, мы попали 16 одновременных соединений они идут спать после нескольких файлов был загружен, то где-то в файле 4,5,6 вдруг я получаю следующее сообщение об ошибке:

Cannot get a connection, pool error Timeout waiting for idle object 

Или, как он появляется в журнале

[    pool-3-thread-35] oveMatchingAuxRecordsFromStage INFO Matching Extra Detail records move finised at: 2013-07-26 17:41:59 
[    pool-3-thread-35] oveMatchingAuxRecordsFromStage INFO Moving Legal  Information records at: 2013-07-26 17:41:59 
[    pool-3-thread-35] DefaultErrorHandler   INFO Failed delivery for (MessageId: ID-IMS-WS2013-001-52799-1374824474993-0-2693 on ExchangeId: ID-IMS-WS2013- 001-52799-1374824474993-0-3230). On delivery attempt: 0 caught: java.lang.Exception: Cannot get a connection, pool error Timeout waiting for idle object 
[thread #0 - file://d:/RMSInbox] ExtractRMSNewAccountFileRoute INFO STARTING ERROR GENERAL HANDLER 

соединения макс MySQL были вытеснены к массивным 512. Я пробовал различные размеры бассейна, нарезание резьбы и т.д. варианты

в самом интерес ко всему моему JDBC-коду следует этой структуре. Это не сложно SQL просто вставить операторы ....

public class RecordParserErrorReporter { 
private static final String SQL_INSERT="INSERT INTO `ETL`.`ETLLog` "+ 
    " ("+ 
    " `ETL_log_text`, "+ 
    " `filename`, "+ 
    " `record_number`, "+ 
    " `error_message`) "+ 
     " VALUES "+ 
     " ("+ 
      " ?, "+ 
      " ?, "+ 
      " ?, "+ 
      " ? "+ 
     "); "; 

private BasicDataSource dataSource; 
public BasicDataSource getDataSource() { 
    return dataSource; 
} 
public void setDataSource(BasicDataSource dataSource) { 
    this.dataSource = dataSource; 
} 
public void HandleError 
(
     @Body String msgBody 
     , @Headers Map hdr 
     , Exchange exch 
) 
{ 

    Connection conn = null; 
    PreparedStatement stmt=null; 
    try 
    { 

     Exception e = exch.getProperty(Exchange.EXCEPTION_CAUGHT,Exception.class); 
     conn= dataSource.getConnection(); 
     stmt =conn.prepareStatement(SQL_INSERT);      
     stmt.setString(1, msgBody); 
     stmt.setString(2, (String)hdr.get("CamelFileName")); 
     stmt.setInt(3, (Integer)hdr.get("SPLITNUMBER")); 
     stmt.setString(4, e.getMessage()); 
     stmt.executeUpdate();   






    } 
    catch (Exception e) 
    { 
     System.out.println(e.getMessage()); 

    } 

    finally 
    { 
     try 
     { 
       if (stmt!=null) 
       { 
        stmt.close(); 
       } 
       if (conn!=null) 
       { 
        conn.close(); 
        conn= null; 
       } 
     } 
     catch(SQLException e) 
     { 
      System.out.println(e.getMessage()); 

     } 

    } 



} 

}

Как отследить, почему мои связи не повторного использования? Или, если я пропущу связи, как мне отслеживать это?

Update:

Я установил файл потребитель потреблять 1 файл на каждый опрос с секундной задержкой между опросами. Я вижу, что он создает новый пул соединений каждый раз, когда начинается маршрут, а затем не повторно использует ранее созданный пул. Появляется, что для каждого файла, в очереди которого создается новый пул соединений.Не совсем то, что я хочу. Это верно.

Я включил снимок экрана, как это выглядит смотрите ниже:

enter image description here

Update 2:

Это не работает под яровую DM, но синяя печать OSGI. Я довольно убежден, что источник данных получает экземпляр более одного раза.

Update 3:

Ну хорошо, так что источник данных не получает экземпляр более чем один раз. Простые соединения просто не используются. Я действительно нашел что-то интересное, хотя я подозреваю, что это может быть связано с тем, что я вижу больше информации в ссылке здесь: http://fusesource.com/forums/thread.jspa?threadID=4659&tstart=15

Судя по количеству просмотров этого вопроса, я получаю, что я довольно сильно застрял там.

ответ

1

Обнаружена проблема. Это была настоящая ошибка пикника или ошибка id10t на моей стороне.

В моей ETL фасоли я имел следующие строки кода

try 
{ 
     conn= dataSource.getConnection(); 
     stmt =conn.prepareStatement(SQL_ETL_INSERT);  
     stmt.setString(1, (String)hdr.get("CamelFileName")); 
     stmt.executeUpdate();   
     conn= dataSource.getConnection(); 

     stmt =conn.prepareStatement(SQL_ETL_DELETE); 
     stmt.setString(1, (String)hdr.get("CamelFileName")); 
     stmt.executeUpdate();  

    } 
    catch (Exception e) 
    { 

     throw new Exception(e.getMessage()); 

    } 

    finally 
    { 
     try 
     { 
       if (stmt!=null) 
       { 
        stmt.close(); 
        stmt= null; 
       } 
       if (conn!=null) 
       { 
        conn.close(); 
        conn= null; 
       } 
     } 
     catch(SQLException e) 
     { 

      throw new Exception(e.getMessage()); 

     } 

    }  

Примечание Я бег conn= dataSource.getConnection(); дважды! Затем я освобождаю только одно из соединений. Копирование и вставка и поздняя ночная кодировка не смешиваются.