2016-11-20 1 views
0

Например, у меня есть следующий runnable java-код.Можно ли преобразовать следующий код в rxjava

Речь идет о производителях и нескольких параллельных потребителях. Эти потребители выполняют трудоемкие задания, и они работают параллельно.

Интересно, подходит ли этот пример использования rx-java и как его переписать в rx-java.

public class DemoInJava { 
    public static void main(String[] args) { 

     final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); 

     AtomicBoolean done = new AtomicBoolean(false); 
     Thread producer = new Thread(() -> { 
      int offset = 0; 
      int limit = 10; 
      while (true) { 
       if (queue.isEmpty()) { 
        if (offset < 100) {// there is 100 records in db 
         fetchDataFromDb(offset, limit).forEach(e -> queue.add(e)); 
         offset = offset + limit; 
        } else { 
         done.set(true); 
         break; // no more data 
        } 
       } else { 
        try { 
         Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way. 
        } catch (InterruptedException e) { 
        } 
       } 
      } 
     }); 

     List<Thread> consumers = IntStream.range(0, 5).boxed().map(c -> new Thread(() -> 
     { 
      while (true) { 
       Integer i = queue.poll(); 
       if (i != null) { 
        longRunJob(i); 
       } else { 
        if (done.get()) { 
         break; 
        } else { 
         try { 
          Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way. 
         } catch (InterruptedException e) { 
         } 
        } 
       } 
      } 
     })).collect(Collectors.toList()); 

     producer.start(); 
     consumers.forEach(c -> c.start()); 
    } 

    private static List<Integer> fetchDataFromDb(int offset, int limit) { 
     return IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList()); 
    } 

    private static void longRunJob(Integer i) { 
     System.out.println(Thread.currentThread().getName() + " long run job of " + i); 
    } 
} 

выход:

.... 
Thread-1 long run job of 7 
Thread-1 long run job of 8 
Thread-1 long run job of 9 
Thread-4 long run job of 10 
Thread-4 long run job of 16 
Thread-10 long run job of 14 
Thread-5 long run job of 15 
Thread-8 long run job of 13 
Thread-7 long run job of 12 
Thread-9 long run job of 11 
Thread-10 long run job of 19 
Thread-4 long run job of 18 
Thread-3 long run job of 17 
.... 

ответ

0

Давайте посмотрим ... Во-первых, код:

package rxtest; 

import static io.reactivex.Flowable.generate; 
import static io.reactivex.Flowable.just; 

import java.util.List; 
import java.util.concurrent.Executors; 
import java.util.stream.Collectors; 
import java.util.stream.IntStream; 

import io.reactivex.Emitter; 
import io.reactivex.Scheduler; 
import io.reactivex.schedulers.Schedulers; 

public class Main { 

    private static final Scheduler SCHEDULER = Schedulers.from(Executors.newFixedThreadPool(10)); 

    private static class DatabaseProducer { 
     private int offset = 0; 
     private int limit = 100; 

     void fetchDataFromDb(Emitter<List<Integer>> queue) { 
      System.out.println(Thread.currentThread().getName() + " fetching "+offset); 
      queue.onNext(IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList())); 
      offset += limit; 
     } 
    } 

    public static void main(String[] args) { 
     generate(new DatabaseProducer()::fetchDataFromDb) 
     .subscribeOn(Schedulers.io()) 
     .concatMapIterable(list -> list, 1) // 1 call, no prefetch 
     .flatMap(item -> 
       just(item) 
       .doOnNext(i -> longRunJob(i)) 
       .subscribeOn(SCHEDULER) 
       , 10) // don't subscribe to more than 10 at a time 
     .take(1000) 
     .blockingSubscribe(); 
    } 

    private static void longRunJob(Integer i) { 
     System.out.println(Thread.currentThread().getName() + " long run job of " + i); 
     try { 
      Thread.sleep(1000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

Класс DatabaseProducer является просто состоянием производителем значений, так как он должен ток смещение. Это не обязательно, так как generate вызов мог быть заменен

 generate(() -> 0, (offset,e) -> { 
      e.onNext(IntStream.range(offset, offset + 100).boxed() 
         .collect(Collectors.toList())); 
      return offset + 100; 
     }, e -> {}); 

Но это не так читаемым.

Имейте в виде, что cocatMap и flatMap может и будет заранее получить и предварительно подписаться на наблюдаемые/текучий вплоть до реализации зависимого от предела, даже если нет свободных потоков для их обработок - они просто ставятся в очередь в планировщиков. Цифры на каждом вызове представляют собой пределы, которые мы хотим иметь - 1 на concatMap, потому что мы хотим получить из базы данных только в случае необходимости (если вы поместите здесь 2, вы можете перечитать, но будет меньше латентности в трубопровод).

Если вы хотите выполнить вычисления с использованием Cpu, тогда лучше использовать Schedulers.computation(), так как это автоматически настроено на количество процессоров системы, на которых работает JVM, и вы можете использовать ее из других частей вашей кодовой базы, чтобы вы не перегружали процессор.

+0

Ну, это отлично работает. Спасибо –

+0

Не забудьте принять ответ; это как работает SO. –

 Смежные вопросы

  • Нет связанных вопросов^_^