concurrency

Concurrency Utilities


Callable

Probleme mit Runnable:

  1. Keine Returnmöglichkeit
  1. Kein Werfen von Exceptions
  1. Abbruch schwierig
public interface Callable<V> {

    V call() throws Exception;
}

Motivation Threadpools

class Server {

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(9000);
        while (true) {
            final Socket s = socket.accept();
            new Thread(() -> doWork(s)).start();
        }
    }

    static void doWork(Socket s) {}
}

Für jeden Client mindestens folgende Arbeit:

  1. Thread erzeugen
  1. Thread ausführen
  1. Thread garbage collecten

Executor

public interface Executor {

    void execute(Runnable command);
}
Executor executor = ...
Runnable r = () -> foo();
executor.execute(r);
  • Task - Erzeugung getrennt von Task - Ausführung
  • Ausführung je nach Executor - Implementierung in
  1. neuen Thread
  1. vorhandenen Thread
  1. ausführendem Thread

ExecutorService

public interface ExecutorService extends Executor

wichtige Methoden:

Future<?> submit(Runnable task)
Übernimmt den Task zur Ausführung
Future<T> submit(Callable<T> task)
Übernimmt den Task zur Ausführung

Future

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
V get()
blockt, bis zugehörige call() / run() fertig liefert das Ergebnis von call() / null

Shutdown

void shutdown()
initiiert Shutdown; alte Tasks werden abgearbeitet, aber keine neuen akzeptiert
boolean awaitTermination(timeout)
blockt nach shutdown() bis alle Tasks
fertig -> true oder timeout -> false

Ablauf

interface ArchiveSearcher { 
    String search(String target); 
}

ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(String target) throws InterruptedException {
    Callable<String> task = () -> searcher.search(target);
    Future<String> future = executor.submit(task);
    doOtherThings(); 
    try {
        displayText(future.get()); // Benutzen des Futures
    } catch (ExecutionException ex) { 
        cleanup(); 
    }
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("Work finished, you can close the program now");

Executors

ExecutorService executor = Executors.newSingleThreadExecutor();

wichtige Methoden (returnen ExecutorService)

newSingleThreadExecutor()
Executor mit einem Thread
Bei Exceptions wird ein neuer erzeugt
newFixedThreadPool(int nThreads)
Executor mit n Threads
newCachedThreadPool()
Erzeugt bei Bedarf neue Threads
Unbenutzte Threads werden nach 60s gelöscht

Lock

public interface Lock {

    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
lockInterruptibly
wie lock(), aber interruptbar -> InterruptedException
tryLock
blockt nicht
returnt ob Lock erhalten wurde

static int count = 0;

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 10_000; i++)
        executor.submit(ReentrantLockDemo::increment);
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.DAYS);
    System.out.println(count);
}

static void increment() {
    count++;
}

private static final String lock = "Just for locking";

static void increment() {
    synchronized (lock) {
        count++;
    }
}
static Lock lock = new ReentrantLock();

static void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

ReentrantLock

ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();
executor.submit(() -> {
    lock.lock();
    try {
        sleep(1);
    } finally {
        lock.unlock();
    }});
executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});
executor.shutdown();
Locked: true
Held by me: false
Lock acquired: false

ReadWriteLock

Stellt zwei Lock-Instanzen zur Verfügung

readLock
Kann von beliebig vielen Threads gehalten werden
writeLock
Kann nur von einem Thread gehalten werden
Blockt bis laufende Lesevorgänge beendet sind
Während das writeLock gehalten wird, werden keine readLocks vergeben

ExecutorService executor = Executors.newFixedThreadPool(3);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep(1);
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
}});
Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
    } finally {
        lock.readLock().unlock();
}};
executor.submit(readTask);
executor.submit(readTask);      // beide readTasks printen
executor.shutdown();

Semaphore

Ein Semaphore schützt einen kritischen Bereich, den maximal n Threads gleichzeitig betreten dürfen.

Ein Lock / Monitor ist ein binärer Semaphore.


ExecutorService executor = Executors.newFixedThreadPool(5);
Semaphore semaphore = new Semaphore(5); // 5 Pissoirs
Runnable goToToilet = () -> {
    try {
        semaphore.acquire();
        System.out.println("Semaphore acquired");
        pee();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        semaphore.release();
}};

Condition

synchronized(elevator) {
    while(burning)
        elevator.wait();
}
synchronized(elevator) {
    extinguishFire();
    elevator.notifyAll();
}

Lock lock = new ReentrantLock();
Condition fireExtinguished = lock.newCondition();
lock.lock();        // synchronized {
try {
    while(burning)
        fireExtinguished.await();   // elevator.wait()
} finally {
    lock.unlock();  // synchronized }
}
lock.lock();        // synchronized {
try {
    extinguishFire();
    fireExtinguished.signalAll();   // elevator.notify()
} finally {
    lock.unlock();  // synchronized }
}

Jede Condition hat eine Referenz auf das dazugehörige Lock


Fork / Join

  • Implementierung von ExecutorService
  • Task wird rekursiv in Subtasks aufgesplittet (fork)
  • Ist die Workload klein genug, wird die Task abgearbeitet
  • Dann werden die Teilergebnisse vereinigt (join)

Ablauf

public class MyRecursiveTask extends RecursiveTask<ReturnType> {

    @Override
    protected ReturnType compute() {
        if (my portion of the work is small enough)
          doTheWork();
        else {
          split my work into 2+ pieces
          fork most pieces
          invoke one
          combine results
        }
    }
}

RecursiveAction falls kein Returnwert nötig


public class NullCounterTask extends RecursiveTask<Integer> {

    private final List<?> list;
    private final int threshold;

    public NullCounterTask(Collection<?> coll, int threshold) {
        this.list = new ArrayList<>(coll);
        this.threshold = threshold;
    }
@Override
protected Integer compute() {
    if (sizeBelowThreshold())
        return countNulls();
    else {
        var cutIndex = list.size() / 2;
        var part1 = list.subList(0, cutIndex);
        var part2 = list.subList(cutIndex, list.size());
        var forkedTask = new NullCounterTask(part1, threshold).fork();
        int nullsInPart2 = new NullCounterTask(part2, threshold).invoke();
        return forkedTask.get() + nullsInPart2;
    }
}

Mehr

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Concurrent Collections