Teaching old java Streams new tricks

Implement the Gatherer Stream interface, so you can do:

Stream.of("foo", "bar")
  .gather(yourOwnSpecializedFunction())
  .toList();

Some useful gatheres provided by native Java:

  • fold
  • scan
  • windowFixed
  • windowSliding
  • mapConffurent

Stream Anatomy:

  • Source
    • ex: Stream.of
  • Intermediate operation
    • ex: .filter, .map
  • Terminal Operation
    • ex: .toList
  • Pipeline: the whole operation, from source to terminal operation

What features do we need?

  • consume: produce ratios (1:1, 1:N, N:1, N:M)
  • finite or infinite
    • infinite requires incremental operation
    • finite benefits from incremental operation
  • stateful or stateless
    • some operations are inherently stateless / stateful
  • frugal or greedy
    • frugal requires incremental operation
    • greedy benefits from incremental operation
  • sequential or parallellizable
    • some operations are trivially parallelizable
    • some operations can only run sequentially
  • finishing touches or not
    • some operations can only work when they know that there are no more input elements

Stream.gather(Gatherer): an extensible API for intermediate operations:

interfae Gathere<T, A, R> {
  // Provides the state, if any, to be used during evaluation of the gatherer.
  Supplier<A> initializer();
  // Each inputelement is applied to the integrator, together with the state, and a Downstream handle for as long as it returns `true`.
  Integrator<A, T, R> integrator();
  // If it returns anything but the default value, this Gatherer can be parallelized.
  // When parallellized, this is used to merge partial results into one.
  BinaryOperator<A> combiner();
  // When there are no more input elements, this function is invoked with the state and a Downstream handle to perform a final action.
  BiConsumer<A, Downstream<R>> finisher();
}

Some included Gatherer in java:

Gatherers.fold(): stateful, inherently ordered, N:1 operation

Stream.of(1,2,3,4).gather(Gatheres.fold(() -> "", (string, i) ->  string + i)).findFirst();
// Optional[1234]
Stream.of(1,2,3,4).gather(Gatheres.fold(() -> 0, (sum, i) -> sum + i)).findFirst();
// Optional[10]

Gatherers.scan(): stateful, inherently ordered, 1:1 operation

Stream.of(1,2,3,4,).gather(Gatheres.scan(() -> "", (string, i) -> string + i)).toList();
// [1, 12, 123, 1234]
Stream.of(1,2,3,4,).gather(Gatheres.scan(() -> 0, (sum, i) -> sum + i)).toList();
// [1, 3, 6, 10]

Gatheres.windowFixed(...): stateful, inherently ordered, N:M operation

Stream.of(1,2,3,4,5,6,7,8,9).gather(windowFixed(3)).toList();
// [[1,2,3],[4,5,6],[7,8,9]]
Stream.of(1,2,3,4,5,6,7,8,9).gather(windowFixed(4)).toList();
// [[1,2,3,4],[5,6,7,8],[9]]

Gatheres.windowSliding(...): stateful, inherently ordered, N:M operation

Stream.of(1,2,3,4,5).gather(windowSliding(2)).toList();
// [[1,2],[2,3],[3,4],[4,5]]
Stream.of(1,2,3,4,5).gather(windowSliding(4)).toList();
// [[1,2,3,4],[2,3,4,5]]

Gatheres.mapConcurrent(...): stateful, inherently ordered, 1:! operation

Stream.of(1,2,10,20,30,40)
  .gather(mapConcurrent(2, n -> {
    try { Thread.sleep(n * 1000); }
    catch (Interruptedexception i) { Thread.currentThread.interrupt(); }
    return n;
  }))
  .limit(2)
  .toList();
// Task 10 was interrupted
// Task 20 was interrupted
// [1, 2]