Parallel Streams to jedna z najciekawszych funkcjonalności wprowadzonych w Java 8. Obiecują łatwe przyspiesznie kodu poprzez automatyczne wykorzystanie wszystkich rdzeni procesora. Ale czy zawsze warto z nich korzystać? W praktyce okazuje się, że parallel processing może być zarówno błogosławieństwem jak i przekleństwem.
Dlaczego Parallel Streams są ważne
W dzisiejszych czasach procesory wielordzeniowe są standardem – nawet laptopy mają 4-8 rdzeni. Tradycyjny kod Java wykorzystuje tylko jeden rdzeń, marnując potencjał sprzętu. Parallel Streams pozwalają bez skomplikowanego pisania wielowątkowego kodu wykorzystać całą moc procesora jedną metodą.
Co się nauczysz:
- Kiedy Parallel Streams rzeczywiście przyspieszają kod
- Jak mierzyć wydajność równoległego przetwarzania
- Jakie pułapki czyhają na programistów
- Praktyczne wzorce użycia w enterprise aplikacjach
- Alternatywy dla Parallel Streams
Jak działają Parallel Streams
Parallel Streams wykorzystują wewnętrznie ForkJoinPool – framework do dzielenia zadań na mniejsze części i łączenia wyników. Domyślnie używają puli o rozmiarze równym liczbie dostępnych procesorów minus jeden.
// Podstawowe użycie Parallel Stream Listnumbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // Sequential stream - wykorzystuje jeden wątek int sequentialSum = numbers.stream() .mapToInt(n -> heavyComputation(n)) .sum(); // Parallel stream - wykorzystuje wszystkie dostępne rdzenie int parallelSum = numbers.parallelStream() .mapToInt(n -> heavyComputation(n)) .sum(); private static int heavyComputation(int n) { // Symulacja kosztownej operacji CPU try { Thread.sleep(100); // Uwaga: to NIE jest dobry przykład dla parallel! } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return n * n; }
Kiedy Parallel Streams rzeczywiście przyspieszają
Parallel Streams nie są silver bullet. Przyspieszają kod tylko przy spełnieniu kilku warunków:
Praktyczny test wydajności
public class ParallelStreamBenchmark { public static void main(String[] args) { ListsmallList = generateNumbers(1000); List largeList = generateNumbers(1_000_000); // Test dla małej kolekcji benchmarkCollection("Small (1K)", smallList); // Test dla dużej kolekcji benchmarkCollection("Large (1M)", largeList); } private static void benchmarkCollection(String name, List numbers) { System.out.println("\n=== " + name + " ===" ); // Sequential long start = System.nanoTime(); long sequentialResult = numbers.stream() .filter(n -> isPrime(n)) .count(); long sequentialTime = System.nanoTime() - start; // Parallel start = System.nanoTime(); long parallelResult = numbers.parallelStream() .filter(n -> isPrime(n)) .count(); long parallelTime = System.nanoTime() - start; System.out.printf("Sequential: %d ms, result: %d%n", sequentialTime / 1_000_000, sequentialResult); System.out.printf("Parallel: %d ms, result: %d%n", parallelTime / 1_000_000, parallelResult); System.out.printf("Speedup: %.2fx%n", (double) sequentialTime / parallelTime); } private static boolean isPrime(int n) { if (n < 2) return false; for (int i = 2; i <= Math.sqrt(n); i++) { if (n % i == 0) return false; } return true; } private static List generateNumbers(int count) { return IntStream.range(1, count + 1) .boxed() .collect(Collectors.toList()); } }
Pułapki i zagrożenia bezpieczeństwa
Problem 1: Operacje I/O są zabójcze
// NIGDY tak nie rób - blokuje wszystkie wątki w puli! Listurls = Arrays.asList( "http://api.example1.com", "http://api.example2.com", "http://api.example3.com" ); // BŁĄD - Parallel Stream z I/O List responses = urls.parallelStream() .map(url -> { try { // HTTP call - blokuje wątek na sekundy! return httpClient.get(url).body(); } catch (Exception e) { return "Error: " + e.getMessage(); } }) .collect(Collectors.toList());
Problem 2: Race conditions przy mutowaniu stanu
// NIEBEZPIECZNE - race condition! Listresults = new ArrayList<>(); // NIE thread-safe! numbers.parallelStream() .filter(n -> isPrime(n)) .forEach(results::add); // Może prowadzić do data corruption! // POPRAWNIE - używaj thread-safe collectors List safeResults = numbers.parallelStream() .filter(n -> isPrime(n)) .collect(Collectors.toList()); // Thread-safe collector
Problem 3: Nieodpowiednie typy kolekcji
// DOBRE dla parallel - łatwe do podziału ArrayListarrayList = new ArrayList<>(); int[] array = new int[1000000]; IntStream.range(0, 1000000).toArray(); // ZŁSZE dla parallel - trudne do podziału LinkedList linkedList = new LinkedList<>(); Set hashSet = new HashSet<>(); // Benchmark różnych kolekcji benchmarkCollectionType("ArrayList", arrayList); benchmarkCollectionType("LinkedList", linkedList); benchmarkCollectionType("HashSet", hashSet);
Typ kolekcji | Parallel efficiency | Powód |
---|---|---|
ArrayList | Bardzo dobra | Random access, łatwy podział |
Array | Bardzo dobra | Memory locality, perfect splitting |
LinkedList | Słaba | Sequential access, kosztowny podział |
HashSet | Średnia | Brak porządku, unpredictable splitting |
TreeSet | Dobra | Sorted, balanced tree structure |
Monitoring i debugowanie Parallel Streams
public class ParallelStreamMonitoring { public static void monitorParallelExecution() { Listnumbers = IntStream.range(1, 10000) .boxed() .collect(Collectors.toList()); // Monitoring który wątek wykonuje operacje numbers.parallelStream() .filter(n -> { System.out.println("Processing " + n + " on thread: " + Thread.currentThread().getName()); return isPrime(n); }) .count(); // Sprawdzenie rozmiaru puli wątków int parallelism = ForkJoinPool.commonPool().getParallelism(); System.out.println("Available parallelism: " + parallelism); // Custom ForkJoinPool dla kontroli nad liczbą wątków ForkJoinPool customThreadPool = new ForkJoinPool(2); try { Long result = customThreadPool.submit(() -> numbers.parallelStream() .filter(ParallelStreamMonitoring::isPrime) .count() ).get(); System.out.println("Primes found with custom pool: " + result); } catch (Exception e) { e.printStackTrace(); } finally { customThreadPool.shutdown(); } } private static boolean isPrime(int n) { if (n < 2) return false; return IntStream.range(2, (int) Math.sqrt(n) + 1) .noneMatch(i -> n % i == 0); } }
Alternatywy dla Parallel Streams
CompletableFuture dla operacji I/O
// Zamiast Parallel Streams dla I/O - użyj CompletableFuture public class AsyncProcessing { private final ExecutorService executor = Executors.newFixedThreadPool(10); // Dedykowana pula dla I/O public ListfetchDataAsync(List urls) { List > futures = urls.stream() .map(url -> CompletableFuture.supplyAsync(() -> httpClient.get(url).body(), executor)) .collect(Collectors.toList()); return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }
Reactive Streams dla backpressure
// RxJava 2 - lepsze dla złożonych pipeline'ów Observable.fromIterable(numbers) .parallel(4) // Kontrola nad liczbą wątków .runOn(Schedulers.computation()) .filter(this::isPrime) .sequential() .toList() .subscribe(results -> System.out.println("Primes: " + results.size()));
Nie. Domyślnie używają Runtime.getRuntime().availableProcessors() - 1
wątków. Możesz to kontrolować przez custom ForkJoinPool lub system property java.util.concurrent.ForkJoinPool.common.parallelism
.
Najczęstsze przyczyny: za mała kolekcja (overhead > zysk), operacje I/O (blokują wątki), nieefektywny spliterator (LinkedList), lub zbyt prosta operacja (brak work to parallelize).
Technicznie tak, ale nie powinieneś. Zagnieżdzone parallel streams mogą prowadzić do deadlocków i znacznego spadku wydajności przez competition o wspólną pulę wątków.
Testy jednostkowe powinny sprawdzać poprawność wyników. Testy wydajnościowe uruchamiaj w środowisku podobnym do prod. Używaj JMH (Java Microbenchmark Harness) do rzetelnych pomiarów.
Standardowe collectory (toList(), toSet(), groupingBy()) są thread-safe. Problemy powstają przy custom collectorach lub mutowaniu zewnętrznych obiektów w lambdach.
Ostrożnie. W aplikacjach web (Spring MVC) już masz pool request-handling threads. Parallel Streams mogą konkurować o zasoby CPU. Lepsze jest często asynchroniczne przetwarzanie z dedykowaną pulą.
Używaj profilerów (JProfiler, YourKit), APM toolsów (AppDynamics, New Relic) i metryek JVM. Ważne są também latency percentiles (P95, P99), nie tylko średnia throughput.
Przydatne zasoby:
- Oracle Documentation – Stream API
- JEP 266: More Concurrency Updates (Java 9)
- JMH – Java Microbenchmark Harness
- Baeldung – When to Use Parallel Streams
🚀 Zadanie dla Ciebie
Benchmark Challenge: Stwórz program który porównuje wydajność sequential vs parallel streams dla różnych operacji: filtrowanie liczb pierwszych, sortowanie stringów, grupowanie obiektów. Przetestuj na kolekcjach o różnych rozmiarach (1K, 10K, 100K, 1M elementów). Zmierz i wykreśl wyniki – kiedy parallel streams dają rzeczywisty zysk w Twoim środowisku?