Concurrency Utilities
Callable
Probleme mit Runnable:
- Keine Returnmöglichkeit
- Kein Werfen von Exceptions
- Abbruch schwierig
public interface Callable<V> {
V call() throws Exception;
}
Motivation Threadpools
class Server {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(9000);
while (true) {
final Socket s = socket.accept();
new Thread(() -> doWork(s)).start();
}
}
static void doWork(Socket s) {}
}
Für jeden Client mindestens folgende Arbeit:
- Thread erzeugen
- Thread ausführen
- Thread garbage collecten
Executor
public interface Executor {
void execute(Runnable command);
}
Executor executor = ...
Runnable r = () -> foo();
executor.execute(r);
- Task - Erzeugung getrennt von Task - Ausführung
- Ausführung je nach Executor - Implementierung in
- neuen Thread
- vorhandenen Thread
- ausführendem Thread
ExecutorService
public interface ExecutorService extends Executor
wichtige Methoden:
Future<?> submit(Runnable task)- Übernimmt den Task zur Ausführung
Future<T> submit(Callable<T> task)- Übernimmt den Task zur Ausführung
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
V get()-
blockt, bis zugehörige
call()/run()fertig liefert das Ergebnis voncall()/null
Shutdown
void shutdown()- initiiert Shutdown; alte Tasks werden abgearbeitet, aber keine neuen akzeptiert
boolean awaitTermination(timeout)- blockt nach
shutdown()bis alle Tasks - fertig ->
trueoder timeout ->false
Ablauf
interface ArchiveSearcher {
String search(String target);
}
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(String target) throws InterruptedException {
Callable<String> task = () -> searcher.search(target);
Future<String> future = executor.submit(task);
doOtherThings();
try {
displayText(future.get()); // Benutzen des Futures
} catch (ExecutionException ex) {
cleanup();
}
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("Work finished, you can close the program now");
Executors
ExecutorService executor = Executors.newSingleThreadExecutor();
wichtige Methoden (returnen ExecutorService)
newSingleThreadExecutor()- Executor mit einem Thread
- Bei Exceptions wird ein neuer erzeugt
newFixedThreadPool(int nThreads)- Executor mit n Threads
newCachedThreadPool()- Erzeugt bei Bedarf neue Threads
- Unbenutzte Threads werden nach 60s gelöscht
Lock
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
lockInterruptibly- wie
lock(), aber interruptbar ->InterruptedException tryLock- blockt nicht
- returnt ob Lock erhalten wurde
static int count = 0;
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10_000; i++)
executor.submit(ReentrantLockDemo::increment);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
System.out.println(count);
}
static void increment() {
count++;
}
private static final String lock = "Just for locking";
static void increment() {
synchronized (lock) {
count++;
}
}
static Lock lock = new ReentrantLock();
static void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
ReentrantLock
ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();
executor.submit(() -> {
lock.lock();
try {
sleep(1);
} finally {
lock.unlock();
}});
executor.submit(() -> {
System.out.println("Locked: " + lock.isLocked());
System.out.println("Held by me: " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
});
executor.shutdown();
Locked: true
Held by me: false
Lock acquired: false
ReadWriteLock
Stellt zwei Lock-Instanzen zur Verfügung
readLock- Kann von beliebig vielen Threads gehalten werden
writeLock- Kann nur von einem Thread gehalten werden
- Blockt bis laufende Lesevorgänge beendet sind
- Während das
writeLockgehalten wird, werden keinereadLocksvergeben
ExecutorService executor = Executors.newFixedThreadPool(3);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}});
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
} finally {
lock.readLock().unlock();
}};
executor.submit(readTask);
executor.submit(readTask); // beide readTasks printen
executor.shutdown();
Semaphore
Ein Semaphore schützt einen kritischen Bereich, den maximal n Threads gleichzeitig betreten dürfen.
Ein Lock / Monitor ist ein binärer Semaphore.
ExecutorService executor = Executors.newFixedThreadPool(5);
Semaphore semaphore = new Semaphore(5); // 5 Pissoirs
Runnable goToToilet = () -> {
try {
semaphore.acquire();
System.out.println("Semaphore acquired");
pee();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}};
Condition
synchronized(elevator) {
while(burning)
elevator.wait();
}
synchronized(elevator) {
extinguishFire();
elevator.notifyAll();
}
Lock lock = new ReentrantLock();
Condition fireExtinguished = lock.newCondition();
lock.lock(); // synchronized {
try {
while(burning)
fireExtinguished.await(); // elevator.wait()
} finally {
lock.unlock(); // synchronized }
}
lock.lock(); // synchronized {
try {
extinguishFire();
fireExtinguished.signalAll(); // elevator.notify()
} finally {
lock.unlock(); // synchronized }
}
Jede Condition hat eine Referenz auf das dazugehörige Lock
Fork / Join
- Implementierung von
ExecutorService - Task wird rekursiv in Subtasks aufgesplittet (fork)
- Ist die Workload klein genug, wird die Task abgearbeitet
- Dann werden die Teilergebnisse vereinigt (join)
Ablauf
public class MyRecursiveTask extends RecursiveTask<ReturnType> {
@Override
protected ReturnType compute() {
if (my portion of the work is small enough)
doTheWork();
else {
split my work into 2+ pieces
fork most pieces
invoke one
combine results
}
}
}
RecursiveAction falls kein Returnwert nötig
public class NullCounterTask extends RecursiveTask<Integer> {
private final List<?> list;
private final int threshold;
public NullCounterTask(Collection<?> coll, int threshold) {
this.list = new ArrayList<>(coll);
this.threshold = threshold;
}
@Override
protected Integer compute() {
if (sizeBelowThreshold())
return countNulls();
else {
var cutIndex = list.size() / 2;
var part1 = list.subList(0, cutIndex);
var part2 = list.subList(cutIndex, list.size());
var forkedTask = new NullCounterTask(part1, threshold).fork();
int nullsInPart2 = new NullCounterTask(part2, threshold).invoke();
return forkedTask.get() + nullsInPart2;
}
}
Mehr
CyclicBarrierPhaserCountDownLatchConcurrent Collections