2011-05-03 2 views
0

Это код, который я использую для реализации очереди. Здесь очередь запросов всегда возвращает значение null, даже если очередь не пуста.связанный список Очередь в java не работает с потоками

Runnable runnable = new Runnable() { 

    @Override 
    public void run() { 
     service.schedule(runnable, 500, TimeUnit.MILLISECONDS); 
     process(); 
    } 

    public void process() { 
     try { 
      String tt = nextItem(); 
      //System.out.println("SQ:"+tt); 
     } catch (Exception e) {//Catch exception if any 
      System.out.println("2Error: " + e.getMessage()); 
     } 
    } 
}; 

public String nextItem() { 
    Object poll; 
    try { 
     synchronized (queue) { 
      System.out.println("SQ:" + queue.poll()); 
      //if (poll != null) { 
      // return poll.toString(); 
      //} else { 
      return ""; 
      //} 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
     return ""; 
    } 
} 

public void run() { 
    try { 
     Class.forName("com.mysql.jdbc.Driver"); 
     String url = 
       "jdbc:mysql://1xxx:3306/ayan"; 
     Connection con = 
       DriverManager.getConnection(
       url, "[user]", "[pass]"); 

     Queue queue = new LinkedList(); 
     service = Executors.newScheduledThreadPool(1000); 
     service.schedule(runnable, 0, TimeUnit.MILLISECONDS); 
     while (true) { 
      Statement statement = con.createStatement(); 
      statement.setFetchSize(1); 
      ResultSet resultSet = statement.executeQuery("SELECT * from query_q"); 
      while (resultSet.next()) { 
       // process results. each call to next() should fetch the next row 
       String id = resultSet.getString("id"); 
       String query = resultSet.getString("query"); 
       String msisdn = resultSet.getString("msisdn"); 
       String pass = id + "|" + query + "|" + msisdn; 
       System.out.println("MQ:" + pass); 
       //String str = "foo"; 
       //Queue<Character> charsQueue = new LinkedList<Character>(); 
       boolean inserted = false; 
       for (char c : pass.toCharArray()) { 
        inserted = queue.offer(c); 
       } 

       if (inserted != false) { 
        // Statement stats = con.createStatement(); 
        //stats.executeUpdate("delete from query_q where id=" + id); 
       } 
      } 
      Thread.sleep(10000); 
     } 
     //con.close(); 
} 
+0

Вы могли бы создать компактный тестовый пример, который показывает проблему? – gd1

+0

И _reads_ И _writes_ должны быть «синхронизированы». – mre

ответ

1

LinkedList - единственная небезопасная очередь Queue. Любая другая реализация была бы лучшим выбором. Ваше предложение не синхронизировано. ;)

У ExecutorService есть встроенная очередь. Вы можете использовать это, а не создавать свою собственную. Просто выполняйте (Runnable) задачи, поскольку вам нужно что-то делать.

0

Это потому, что вы не синхронизируете очередь для очереди queue.offer(). Вам необходимо синхронизировать весь доступ к очереди.

Самый простой способ сделать это - использовать LinkedBlockingQueue, который позаботится обо всех синхронизациях для вас.

0

Обратите внимание, что вы называете offer() и poll() на различных queue-х - offer() 's queue является локальной переменной, в то время как poll() «s один, вероятно, поле:

Queue queue = new LinkedList(); 

Кроме того, требуется syncrhonization, как это было предложено в других ответах.