2013-03-06 4 views
1

Я знаю, что могу использовать JMS и ActiveMQ, но мне действительно нужно что-то очень простое и без большого количества накладных расходов. Я провел некоторое тестирование с ActiveMQ и не очень понравилось выполнение очередей сохранения.Есть ли какая-либо блокировка Java-блокировки, которая может сохранять данные на жесткий диск при достижении предела

Что я ищу - это базовая реализация любой блокирующей очереди с возможностью хранения сообщения на жестком диске (в идеале), если достигнут предел определенного размера. Затем он должен иметь возможность читать сохраненное сообщение с жесткого диска и, если возможно, прекратить запись нового на жесткий диск (восстановление в памяти).

Мой сценарий очень прост - сообщения (json) поступают извне. Я делаю некоторую обработку, а затем отправляю их в другую службу REST. Проблема может возникнуть, когда целевая служба REST отключена или сеть между нами плохая. В этом случае готовые к работе события хранятся в очереди, которая потенциально может заполнить всю доступную память. Я не хочу/не должен писать каждое сообщение на HDD/DB - только те, которые не могут вписаться в память.

Спасибо!

+1

То, о чем вы просите, не является «чем-то очень простым». Вы, наверное, хотите «что-то надежное». –

+0

ehcache - это самый простой способ, с помощью которого я могу прозрачно перемещать данные и выключать диски. Если порядок очередей важен, вам придется самому это обработать. – Affe

+0

Да, порядок очереди важен. Кроме того, когда я сказал - «что-то очень простое», я имел в виду, что мне не нужны кластерные решения для предприятий (потому что я могу использовать ActiveMQ). Вся магия должна произойти внутри 1 JVM. Дополнительная приятная функция - если JVM остановлена ​​- заполняет очередь с жесткого диска, если есть какие-либо сообщения. – Alex

ответ

0

Этот код должен работать для вас - сво в памяти постоянной очереди блокировки - нуждается в некоторой настройке файла, но должны работать

package test; 

    import java.io.BufferedReader; 
    import java.io.BufferedWriter; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.FileWriter; 
    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collections; 
    import java.util.LinkedList; 
    import java.util.List; 

    public class BlockingQueue { 

    //private static Long maxInMenorySize = 1L; 
    private static Long minFlushSize = 3L; 

    private static String baseDirectory = "/test/code/cache/"; 
    private static String fileNameFormat = "Table-"; 

    private static String currentWriteFile = ""; 

    private static List<Object> currentQueue = new LinkedList<Object>(); 
    private static List<Object> lastQueue = new LinkedList<Object>(); 

    static{ 
     try { 
      load(); 
     } catch (IOException e) { 
      System.out.println("Unable To Load"); 
      e.printStackTrace(); 
     } 
    } 

    private static void load() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()==0){ 
      //currentQueue = lastQueue = new ArrayList<Object>(); 
      currentWriteFile = baseDirectory + "Table-1"; 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString()+ "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 
     }else{ 
      if(fileList.size()>0){ 
        BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
        String line=null; 
        while ((line=reader.readLine())!=null){ 
         currentQueue.add(line); 
        } 
        reader.close(); 
        File toDelete = new File(fileList.get(0)); 
        toDelete.delete(); 
      } 

      if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1))); 
       currentWriteFile = fileList.get(fileList.size()-1); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        lastQueue.add(line); 
       } 
       reader.close(); 
       //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9)); 
      } 
     } 

    } 

    private void loadFirst() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        currentQueue.add(line); 
       } 
       reader.close(); 
       File toDelete = new File(fileList.get(0)); 
       toDelete.delete(); 
     } 
    } 

    public Object pop(){ 
     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 

     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 
     else 
      return null; 
    } 

    public synchronized Object waitTillPop() throws InterruptedException{ 
     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if(currentQueue.size()==0) 
       wait(); 
     } 
     return currentQueue.remove(0); 
    } 

    public synchronized void push(Object data) throws IOException{ 
     lastQueue.add(data); 
     this.notifyAll(); 
     if(lastQueue.size()>=minFlushSize){ 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString() + "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 

      currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
        (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      BlockingQueue bq = new BlockingQueue(); 

      for(int i =0 ; i<=8 ; i++){ 
       bq.push(""+i); 
      } 

      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 

      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 



     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 


} 
+0

Хорошая попытка, так что не будет голосовать, но я не могу доверять этому в производственной среде, поскольку JVM может потерпеть крах, прежде чем что-то останется на диске. – user924272

0

Хорошо, так что наличие вашей очереди сохраняется на диск будет работать, если вы снова ваша очередь с RandomAccessFile, MemoryMappedFile или MappedByteBuffer .. или какой-либо другой эквивалентной реализации. В случае сбоя или завершения JVM преждевременно, вы можете в значительной степени полагаться на свою операционную систему, чтобы сохранять незафиксированные буферы на диск. Предостережение заключается в том, что если ваш компьютер аварийно завершает работу, вы можете попрощаться с любыми обновлениями в вашей очереди, поэтому убедитесь, что вы это понимаете. Вы можете синхронизировать свой диск для гарантированного сохранения, хотя и с большим ударом по производительности. С более хардкорной точки зрения другой вариант заключается в том, чтобы реплицировать на другую машину для резервирования, что требует отдельного ответа, учитывая его сложность.

 Смежные вопросы

  • Нет связанных вопросов^_^