107

В JDK 8 с лямбдой b93 существовал класс java.util.stream.Streams.zip in b93, который можно было использовать для zip-потоков (это показано в уроке Exploring Java8 Lambdas. Part 1 by Dhananjay Nene). Эта функция:Спиннинг потоков с использованием JDK8 с lambda (java.util.stream.Streams.zip)

Создает ленивый и последовательный комбинированный поток, элементы которого является результатом объединения элементов двух потоков.

Однако в b98 это исчезло. Infact Streams класс даже не доступен в java.util.stream in b98.

Была ли эта функциональность перемещена, и если да, то как я могу быстро использовать сжатие потоков с помощью b98?

Приложение Я имею в виде in this java implementation of Shen, где я заменил функциональность почтового индекса в

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

функция с довольно подробным кодом (который не использует функциональность от b98).

+0

А только что узнал, что это, кажется, были удалены полностью: http://mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/2013-June/002029.html – artella

+0

возможно дубликат [Java 8 java.util.stream.Streams] (http://stackoverflow.com/questions/16780647/java-8-java-util-stream-streams) – assylias

+0

«Изучение Java8 Lambdas. Часть 1» - новая ссылка для этой статьи http://blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1/ –

ответ

58

мне это было нужно, а так, я просто взял исходный код B93 и поместить его в «Util» класса. Мне пришлось немного изменить его, чтобы работать с текущим API.

Для справки вот рабочий код (взять его на свой страх и риск ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a, 
            Stream<? extends B> b, 
            BiFunction<? super A, ? super B, ? extends C> zipper) { 
    Objects.requireNonNull(zipper); 
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator(); 
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator(); 

    // Zipping looses DISTINCT and SORTED characteristics 
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() & 
      ~(Spliterator.DISTINCT | Spliterator.SORTED); 

    long zipSize = ((characteristics & Spliterator.SIZED) != 0) 
      ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown()) 
      : -1; 

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator); 
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator); 
    Iterator<C> cIterator = new Iterator<C>() { 
     @Override 
     public boolean hasNext() { 
      return aIterator.hasNext() && bIterator.hasNext(); 
     } 

     @Override 
     public C next() { 
      return zipper.apply(aIterator.next(), bIterator.next()); 
     } 
    }; 

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics); 
    return (a.isParallel() || b.isParallel()) 
      ? StreamSupport.stream(split, true) 
      : StreamSupport.stream(split, false); 
} 
+1

Не должен ли результирующий поток быть 'SIZED', если _either_ stream' SIZED', а не оба? –

+4

Я так не думаю. Оба потока должны быть «SIZED», чтобы эта реализация работала. На самом деле это зависит от того, как вы определяете zipping. Например, если вы можете закрепить два потока, которые имеют разный размер? Каким будет тогда получившийся поток? Я считаю, что именно поэтому эта функция фактически была исключена из API. Существует много способов сделать это, и пользователь должен решить, какое поведение должно быть «правильным». Не могли бы вы отбросить элементы из более длинного потока или заполнить более короткий список?Если да, с какими значениями? – siki

+0

Если мне что-то не хватает, нет необходимости в каком-либо литье (например, 'Spliterator '). – Jubobs

6

Библиотека Lazy-Seq предоставляет функции zip.

https://github.com/nurkiewicz/LazySeq

Эта библиотека сильно вдохновлена ​​scala.collection.immutable.Stream и стремится обеспечить неизменную, поточно-безопасные и простые в использовании ленивой реализации последовательности, возможно бесконечной.

9

Методы упомянутого вами класса были перенесены в интерфейс Stream в пользу методов по умолчанию. Но кажется, что метод zip был удален. Возможно, потому что неясно, каково поведение по умолчанию для потоков разного размера. Но реализации желаемого поведения прямо вперед:

static <T> boolean every(
    Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) { 
    Iterator<T> it=c2.iterator(); 
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next())); 
} 
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) { 
    Iterator<T> it=c2.iterator(); 
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next())) 
     .findFirst().orElse(null); 
} 
+0

Разве это не предикат, который вы передали фильтру * stateful *? Это нарушает контракт по методу и особенно не работает при параллельной обработке потока. – Andreas

+1

@ Андреас: ни одно из решений здесь не поддерживает параллельную обработку. Поскольку мои методы не возвращают поток, они обеспечивают, чтобы потоки не выполнялись параллельно. Аналогично, код принятого ответа возвращает поток, который можно преобразовать в параллель, но фактически не будет делать ничего параллельно. Тем не менее, государственные предикаты не поощряются, но не нарушают контракт. Они могут использоваться даже в параллельном контексте, если вы убедитесь, что обновление состояния является потокобезопасным. В некоторых ситуациях они неизбежны, например. превращение потока в отдельный - это предикат statefull * per se *. – Holger

+2

@Andreas: вы можете догадаться, почему эти операции были удалены из Java API ... – Holger

37

молния является одной из функций, предоставляемых protonpack library.

Stream<String> streamA = Stream.of("A", "B", "C"); 
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut"); 

List<String> zipped = StreamUtils.zip(streamA, 
             streamB, 
             (a, b) -> a + " is for " + b) 
           .collect(Collectors.toList()); 

assertThat(zipped, 
      contains("A is for Apple", "B is for Banana", "C is for Carrot")); 
+0

также найден в StreamEx: http://amaembo.github.io/streamex/javadoc/one/util/streamex/StreamEx.html#zip-java.util.List-java.util.List-java.util.function. BiFunction- – tokland

1
public class Tuple<S,T> { 
    private final S object1; 
    private final T object2; 

    public Tuple(S object1, T object2) { 
     this.object1 = object1; 
     this.object2 = object2; 
    } 

    public S getObject1() { 
     return object1; 
    } 

    public T getObject2() { 
     return object2; 
    } 
} 


public class StreamUtils { 

    private StreamUtils() { 
    } 

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) { 
     Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed(); 
     Iterator<Integer> integerIterator = integerStream.iterator(); 
     return stream.map(x -> new Tuple<>(integerIterator.next(), x)); 
    } 
} 
19

Проносясь два потока с использованием JDK8 с лямбда (gist).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) { 
    final Iterator<A> iteratorA = streamA.iterator(); 
    final Iterator<B> iteratorB = streamB.iterator(); 
    final Iterator<C> iteratorC = new Iterator<C>() { 
     @Override 
     public boolean hasNext() { 
      return iteratorA.hasNext() && iteratorB.hasNext(); 
     } 

     @Override 
     public C next() { 
      return zipper.apply(iteratorA.next(), iteratorB.next()); 
     } 
    }; 
    final boolean parallel = streamA.isParallel() || streamB.isParallel(); 
    return iteratorToFiniteStream(iteratorC, parallel); 
} 

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) { 
    final Iterable<T> iterable =() -> iterator; 
    return StreamSupport.stream(iterable.spliterator(), parallel); 
} 
+1

Хорошее решение и (относительно) компактное! Требуется, чтобы вы поместили 'import java.util.function. *;' И 'import java.util.stream. *;' В верхней части файла. – sffc

+0

Обратите внимание, что это операция терминала в потоке. Это означает, что для бесконечных потоков этот метод ломается – smac89

1

AOL, cyclops-react, к которому я внести свой вклад, а также обеспечивает сжать функциональность, как через extended Stream implementation, который также реализует реактивно-потоки интерфейса ReactiveSeq, и через StreamUtils, который предлагает большую часть той же функциональности с помощью статических методов к стандартным Java-потоки.

List<Tuple2<Integer,Integer>> list = ReactiveSeq.of(1,2,3,4,5,6) 
                .zip(Stream.of(100,200,300,400)); 


    List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6), 
                Stream.of(100,200,300,400)); 

Он также предлагает более обобщенные аппликативные основанные на молнии. Например.

ReactiveSeq.of("a","b","c") 
       .ap3(this::concat) 
       .ap(of("1","2","3")) 
       .ap(of(".","?","!")) 
       .toList(); 

    //List("a1.","b2?","c3!"); 

    private String concat(String a, String b, String c){ 
    return a+b+c; 
    } 

И даже способность пары каждый элемент в одном потоке с каждой позиции в другом

ReactiveSeq.of("a","b","c") 
       .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b); 

    //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2") 
0

Это здорово. Я должен был сжать два потока в карту с одним потоком является ключевым и другими являющимся значением

Stream<String> streamA = Stream.of("A", "B", "C"); 
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut");  
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA, 
        streamB, 
        (a, b) -> { 
         final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b); 
         return entry; 
        }); 

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))); 

Выхода: {А = яблоко, В = банане, С = Морковью}

7

Так как я могу «т себе любое использование проносясь на других, чем индексируются те (списки) коллекции, и я большой поклонник простоты, это было бы мое решение:

<A,B,C> Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){ 
    int shortestLength = Math.min(lista.size(),listb.size()); 
    return IntStream.range(0,shortestLength).mapToObject(i -> { 
      return zipper.apply(lista.get(i), listb.get(i)); 
    });   
} 
12

Если у вас есть Guava в вашем проекте, вы можете использовать Streams.zip способ (был добавлен в Guava 21):

Возвращает поток, в котором каждый элемент является результатом передачи соответствующего элемента каждого из streamA и streamB в функцию. Результирующий поток будет только до тех пор, пока более короткий из двух входных потоков; если один поток длиннее, его дополнительные элементы будут проигнорированы. Полученный поток неэффективно расщепляется. Это может повредить параллельной работе.

public class Streams { 
    ... 

    public static <A, B, R> Stream<R> zip(Stream<A> streamA, 
      Stream<B> streamB, BiFunction<? super A, ? super B, R> function) { 
     ... 
    } 
} 
0

Я смиренно предлагаю эту реализацию. Полученный поток усекается до более короткого из двух входных потоков.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) { 
    Spliterator<L> lefts = leftStream.spliterator(); 
    Spliterator<R> rights = rightStream.spliterator(); 
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) { 
     @Override 
     public boolean tryAdvance(Consumer<? super T> action) { 
      return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right)))); 
     } 
    }, leftStream.isParallel() || rightStream.isParallel()); 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^