2017-02-14 32 views
1

Я новичок в Hazelcast, и я пытаюсь использовать его для хранения данных на карте, которая слишком велика, чем возможно для установки на одной машине.Hazelcast keySet streaming?

Одним из процессов, который мне нужно реализовать, является перебросить каждое из значений на карте и сделать с ними что-то, а не накапливать или агрегировать, и мне не нужно сразу просматривать все данные, поэтому это не проблема памяти.

Моя тривиальная реализация заключалась бы в использовании IMap.keySet(), а затем для итерации по всем ключам, чтобы поочередно получить каждое сохраненное значение (и разрешить значение GCed после обработки), но я обеспокоен тем, что будет так много данных в системе, что даже просто получение списка ключей будет достаточно большим, чтобы наложить чрезмерное напряжение на систему.

Я надеялся, что существует потоковый API, с помощью которого я могу передавать ключи (или даже полные записи) таким образом, что локальному узлу не нужно будет кэшировать весь набор локально, но не найти ничего, что казалось бы актуальным мне в документации.

Буду признателен за любые предложения, которые вы можете придумать. Благодарю.

+0

действительно ли мой ответ имеет смысл для вас? –

+0

привет Гусс. есть вопросы? в противном случае, пожалуйста, примите мой ответ. Спасибо –

+0

Спасибо за ваш ответ, в конце концов это было не то, с чем мы пошли, но по мере того, как проект был также запущен, я не очень инвестировал в это :-). Для будущих проектов я хотел бы посмотреть на Jet в любом случае, и введение, которое вы предоставили, было очень полезно. – Guss

ответ

1

Hazelcast Jet предоставляет распределенную версию j.u.s и добавляет возможности «потоковой передачи» до IMap. Позволяет выполнять API Java Streams API в кластере Hazelcast.

import com.hazelcast.jet.JetInstance; 
import com.hazelcast.jet.stream.DistributedCollectors; 
import com.hazelcast.jet.stream.IStreamMap; 
import com.hazelcast.jet.stream.IStreamList; 

import static com.hazelcast.jet.stream.DistributedCollectors.toIList; 

    final IStreamMap<String, Integer> streamMap = instance1.getMap("source"); 
    // stream of entries, you can grab keys from it 
    IStreamList<String> counts = streamMap.stream() 
        .map(entry -> entry.getKey().toLowerCase()) 
        .filter(key -> key.length() >= 5) 
        .sorted() 
        // this will store the result on cluster as well 
        // so there is no data movement between client and cluster 
        .collect(toIList()); 

Пожалуйста, найти более подробную информацию о самолете here и другие примеры here.

Приветствие, Vik

+0

Поток записей еще лучше. Вопросы: '.flatMap (o -> Stream.of (o.getSingleVal()))' кажется излишним? Что вы имеете в виду, когда пишете «так что нет движения данных между клиентом и кластером» - не все ли записи потока к клиенту, где мы запускаем этот процесс? – Guss

+0

Клиент отправит вычислительную задачу, описанную в вашем конвейере 'j.u.s', чтобы мы сгруппировали. Он будет выполнен в кластере, и результат будет записан в кластер (проверьте, что метод 'toIList' collect) –

+0

Я обновил фрагмент с помощью импорта –

0

Хотя реализация потока Hazelcast Jet выглядит впечатляюще, у меня не было много времени, чтобы инвестировать в глядя на модернизации в Hazelcast Jet (в нашем довольно много болотного стандарте vert.x настроить). Вместо этого я использовал IMap.executeOnEntries, который, кажется, делает примерно то же самое, что и для Hazelcast Jet by @Vik Gamov, за исключением более раздражающего синтаксиса. не

Мой пример:

myMap.executeOnEntries(new EntryProcessor<String,MyEntity>(){ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public Object process(Entry<String, MyEntity> entry) { 
     entry.getValue().fondle(); 
     return null; 
    } 
    @Override 
    public EntryBackupProcessor<String, MyEntity> getBackupProcessor() { 
     return null; 
    }}); 

Как вы можете видеть, синтаксис очень раздражает:

  1. Нам нужно создать реальный объект, который может быть сериализован в кластер - не фантазии лямбды здесь (не используйте мой серийный идентификатор, если вы скопируете & вставьте это - его сломанный по дизайну).
  2. Одна из причин, по которым не может быть лямбда, заключается в том, что интерфейс не является функциональным - вам нужен другой метод обработки резервных копий (или, по крайней мере, объявить, что вы не хотите обрабатывать их, как я), что, хотя я acknolwedge его важность, это не важно все время, и я бы догадался, что его единственный важный в редких случаях.
  3. Очевидно, вы не можете (или, по крайней мере, его нетривиально) возвращать данные из процесса - что не важно в моем случае, но все же.

 Смежные вопросы

  • Нет связанных вопросов^_^