streams

Java 8 Streams


Motivation

Code als Datentransformationen sehen

public int calcTotalWorkingHoursFromFile(Path path) throws IOException {
  try (BufferedReader reader = Files.newBufferedReader(path)) {
    String line;
    int sum = 0;
    while ((line = reader.readLine()) != null) {
      Employee employee = new Employee(line);
      if (employee.getJob() == Job.PROGRAMMER) {
        Project project = employee.getProject();
        sum += project.getWorkingHours();
      }
    }
    return sum;
  }
}

  • Zeile lesen
  • new Employee(line)
  • .getProject()
  • .getWorkingHours()
  • Summe
String.split(String.upcase("wtf elixir"))
"wtf elixir" |> String.upcase() |> String.split()
defp calc_total_working_hours_from_file(file) do
    file
    |> File.read()  # line1\nline2\nline3...
    |> String.split("\n", trim: true)
    |> Employee.new()
    |> Employee.get_project()
    |> Project.get_workingHours()
    |> Enum.sum()
end

public int calcTotalWorkingHoursFromFile(Path path) throws IOException {
  try (Stream<String> stream = Files.lines(path)) {
    return stream.map(Employee::new)
        .filter(e -> e.getJob() == Job.PROGRAMMER)
        .map(Employee::getProject)
        .mapToInt(Project::getWorkingHours)
        .sum();
  }
}

Streams ⟹ Lambdas ⟹ Functional interfaces


Stream<T>

Datenpipeline
  1. source
  2. intermediate operations
  3. terminal operation
lazy
Operationen laden bei Bedarf neues Element
1TB großes File zusammenfassen ✔️
immutable
Methoden liefern neue Streams

Source

Stream<String> names = Stream.of("Alfred", "Birgit");
LongStream ids = Arrays.stream(new long[]{42, 2, 37});
Stream<?> elements = collection.stream();
IntStream.range(0, 4);
    => [0, 1, 2, 3]
IntStream lottoNumbers = new Random().ints(1, 46);
Supplier<String> supplier = () -> "java";
Stream<String> javas = Stream.generate(supplier);
    => [java, java, java, java, ...]
Stream.iterate("-", s -> s + "-");
    => [-, --, ---, ----, -----, ...]
IntStream.iterate(1, i -> i *= 10)
    => [1, 10, 100, 1_000, 10_000, ...]

"P0S 🐢".chars()   // IntStream
    => [80, 48, 83, 32, 55357, 56354]

IntStream der char - Values

try (Stream<String> lines = Files.lines(path)) { 
  lines. ...
}

IO-Streams schließen!


Intermediate Operations

filter(predicate)
entfernt Elemente, für die predicate.test() false liefert

Stream.of("--", "-", "---")
    .filter(s -> s.length() > 1);   => [--, ---]
    
distinct()
entfernt doppelte Elemente

Stream.of(42, 37, 42)
    .distinct();    => [42, 37]
    

limit(long maxSize)
maximal maxSize Elemente dürfen weiter

Stream.iterate("-", s -> s + "-")
    .limit(3);  => [-, --, ---]
    
skip(long n)
überspringt n Elemente

IntStream.range(4, 9)
    .skip(2);   => [6, 7, 8]
    
sorted()
sortiert den Stream

new Random().ints(3, 0, 10)
    .sorted()  => [0, 1, 5]
    

map(function)
transformiert mit function alle Elemente

Stream.of("Alfred", "Bernd", "Christina")
    .map(s -> s.charAt(0));  => [A, B, C]
    
mapToXxx(function)
wie map, retourniert XxxStream

Stream.of("Alfred", "Bernd", "Christina")
    .mapToInt(String::length);   => [6, 5, 9]
    
peek(consumer)
führt consumer aus, aber Elemente dürfen weiter
nützlich zum Debuggen

Stream.iterate(100, n -> n > 0, n -> n / 10)
    .peek(System.out::println);
    

Nur für XxxStream

mapToObj(function)
wie map, retourniert Stream<T>

DoubleStream.of(3.14, 1.41, 2.71)
    .mapToInt(d -> (int) d)         => [3, 1, 2]    
    .mapToObj(n -> "-".repeat(n))); => [---, -, --]
    

"P0S".chars()   // IntStream [80, 48, 83]
    .mapToObj(Character::toString)  => [P, 0, S]
    
boxed()
retourniert Stream<T>

IntStream.range(0,10)   // IntStream
    .boxed();       // Stream<Integer>
    

Flatmap

Stream.of("3:2", "1:0", "0:2")  // Stream<String>
    .map(result -> {
      String[] splitted = result.split(":");
      return Arrays.stream(splitted);
    }); // Stream<Stream<String>> [[3, 2], [1, 0], [0, 2]]

Prinzipiell möglich, oft unerwünscht

Stream.of("3:2", "1:0", "0:2")  // Stream<String>
    .flatMap(result -> {
      String[] splitted = result.split(":");
      return Arrays.stream(splitted);
    }); // Stream<String> [3, 2, 1, 0, 0, 2]

Parallelisierung

IntStream.range(0, 3)  => [0, 1, 2]
    .parallel()     => [?, ?, ?]
  • nutzt mehrere Cores der CPU
  • schneller
  • teilweise problematisch
  • stateful - Operationen synchronisieren Status, z.B:
    • sorted()
    • distinct()

Terminal

public interface Stream<T> extends BaseStream<T, Stream<T>> {

  long count();
  Optional<T> min(Comparator<? super T> comparator);
  Optional<T> max(Comparator<? super T> comparator);
  boolean anyMatch(Predicate<? super T> predicate);
  boolean allMatch(Predicate<? super T> predicate);
  boolean noneMatch(Predicate<? super T> predicate);
  Object[] toArray();
  ...
}
<A> A[] toArray(intFunction)

String[] subjects = Stream.of("POS", "AM", "TINF")
    .toArray(String[]::new);    => [POS, AM, TINF]
    

void forEach(consumer)
führt consumer für alle Elemente aus

"POS".chars()                               => [80, 79, 83]
    .mapToObj(i -> String.format("%d ", i)) => [80 , 79 , 83 ]
    .forEach(System.out::print);            => 80 79 83 
    
Optional<T> findAny() / findFirst()
liefert ein Element; üblicherweise nach filter

Optional<String> optionalSubject = Stream.of("POS", "AM", "TINF")
    .filter(s -> s.length() > 2)
    .findFirst();   => [POS]

Optional<String> optionalSubject = Stream.of("POS", "AM", "TINF")
    .parallel()
    .filter(s -> s.length() > 2)
    .findAny();

T reduce(T identity, accumulator)
Reduziert Stream<T> auf ein Objekt
identity Startwert
accumulator(T result, T element) transformiert das Paar result, element in ein neues result

int product = IntStream.of(2, 3, 4)
    .reduce(1, (result, factor) -> result * factor);  => 24
    

try (Stream<String> lines = Files.lines(path)) {
  String content = lines.reduce("",
      (result, line) -> result + "\n" + line);
}

OptionalInt optional___ = random.ints(0, 100)
    .limit(10)
    .reduce((r, e) -> r < e ? r : e);   => .min()
    

Nur für XxxStream

public interface IntStream extends BaseStream<Integer, IntStream> {

  int sum();
  OptionalInt min();
  OptionalInt max();
  OptionalDouble average();
  IntSummaryStatistics summaryStatistics();
}
public class IntSummaryStatistics implements IntConsumer {
    
  public final long getSum() { ... }
  public final int getMin() { ... }
  public final int getMax() { ... }
  public final double getAverage() { ... }
}

.collect()

<R> R collect(Supplier<R> supplier,
                          BiConsumer<R, ? super T> accumulator,
                          BiConsumer<R, R> combiner);

ähnlich wie reduce, nur Rückgabetyp beliebig

Supplier<R> supplier
liefert das Returnobjekt
BiConsumer<R, ? super T> accumulator
wird für jedes Element des Streams ausgeführt
BiConsumer<R, R> combiner
für parallele Streams; jeder Einzelstream baut sein eigenes Returnobjekt
combiner fasst diese zusammen

StringBuilder builder = IntStream.range(4, 10)
    .parallel()
    .collect(() -> new StringBuilder(),
        (result, i) -> result.append(i)
                                         .append("th, "),
        (part1, part2) -> part1.append(part2));
4th, 5th, 6th, 7th, 8th, 9th, 
StringJoiner joiner = Stream.of("POS", "AM", "TINF")
    .collect(() -> new StringJoiner(", "),
                         StringJoiner::add,
                         StringJoiner::merge);
POS, AM, TINF
List<String> list = Stream.of("POS", "AM", "TINF")
    .parallel()
    .collect(ArrayList::new,
                         Collection::add,
                         Collection::addAll);

Collectors

erleichtert collect-Aufrufe

String joined = Stream.of("POS", "AM", "TINF")
    .collect(Collectors.joining(", "));
POS, AM, TINF
List<String> list = Stream.of("POS", "AM", "TINF")
    .collect(Collectors.toList());
Set<String> set = Stream.of("POS", "AM", "TINF")
    .collect(Collectors.toSet());
SortedSet<String> sorted = Stream.of("POS", "AM", "TINF")
    .collect(Collectors.toCollection(TreeSet::new));

toMap(keyMapper, valueMapper)
Erzeugt eine Map; für jedes Element wird mit den Mapper-Funktionen ein Key/Value-Paar bestimmt

Map< Integer, String> map = Stream.of("POS", "AM", "TINF")
    .collect(Collectors.toMap(String::length,
                         s -> s)); => {2=AM, 3=POS, 4=TINF}
toMap(..., mergeFunction)
mergeFunction bei doppelten Keys

Map< Integer, String> map = Stream.of("POS", "AM", "TINF", "SYP")
    .collect(Collectors.toMap(String::length, s -> s));
Exception in thread "main" java.lang.IllegalStateException: 
    Duplicate key 3 (attempted merging values POS and SYP)
    

Map< Integer, String> map = Stream.of("POS", "AM", "TINF", "SYP")
    .collect(Collectors.toMap(String::length,
                         s -> s,
                         (old, e) -> e)); => {2=AM, 3=SYP, 4=TINF}

Map<Integer, List<String>> multimap = Stream.of("POS", "AM", "D", "SYP")
    .collect(Collectors.toMap(String::length,
                         s -> new ArrayList<>(List.of(s)),
                         (part1, part2) -> {
                           part1.addAll(part2);
                           return part1;
                         },
                         TreeMap::new
    ));
{1=[D], 2=[AM], 3=[POS, SYP]}

groupingBy ähnlich wie toMap

SortedMap<Integer, Long> grouped = IntStream.of(11, 20, 21, 30, 10, 12)
    .boxed()
    .collect(Collectors.groupingBy(i -> i % 10, // Keys
                         TreeMap::new,   
                         Collectors.counting()));    // Values; Collector!
{0=3, 1=2, 2=1}

Best practice

  • Variable: Collection/Array
  • Parameter: Collection/Array
  • Returnwert: Stream