V Jave vieme ľahko riešiť paralelné úlohy, to je jasné. Vieme však pohodlne programovať úlohy, kde spustíme paralelne viacero úloh, a výsledky z nich spracovávame postupne v takom poradí, v akom sa ich darí dokončovať.
Ukážme si to na príklade spočítavania súhrnnej veľkostí adresárov (a to rekurzívne). Paralelne začneme spočítavať, povedzme, štyri adresáre na troch vláknach, a uvidíme, že menšie adresáre sa spracujú rýchlejšie. Ak by sme použili napríklad ExecutorService#invokeAll()
, sčítavanie veľkostí adresárov by sa mohlo spustiť až po dobehnutí všetkých úloh.
Namiesto toho môžeme použiť ExecutorCompletionService
, ktorý dokáže vykonávať úlohy paralelne, ale zároveň postupne spracovávať výsledky.
Na úlohu budeme potrebovať:
- objekt, ktorý asynchrónne zráta veľkosť jedného adresára.
- pomocný objekt, visitor, ktorý bude navštevovať súbory a adresáre a sčítavať ich veľkosti
- triedu, v ktorej použijeme konkurentnú mašinériu
Zrátavenie veľkostí adresárov
V Jave 7 už nemusíme rekurzívne behať po adresároch. Máme API Files#walkFileTree()
, ktoré pobehá po adresárovej štruktúre, a bude od nás očakávať implementáciu objektu, ktorého metóda sa zavolá pre každý adresár či súbor. (Áno, je to návrhový vzor visitor v praxi).
Ukážka použitia?
SizeSummingFileVisitor visitor = new SizeSummingFileVisitor();
Files.walkFileTree(folder.toPath(), visitor);
Visitor
SizeSummingFileVisitor
je naša trieda, ktorá vyzerá nasledovne:
package sk.upjs.ics.novotnyr.akka;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
public class SizeSummingFileVisitor extends SimpleFileVisitor<Path> {
private long totalSize;
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if(attrs.isRegularFile()) {
totalSize += attrs.size();
}
return super.visitFile(file, attrs);
}
public long getTotalSize() {
return totalSize;
}
}
Pre každý nájdený súbor začneme spočítavať celkovú veľkosť, a to len v prípade, že ide naozaj o regulárny súbor.
Úloha pre spočítavanie
Samotná úloha, ktorá spočíta veľkosť v duchu du -S
, bude implementovať interfejs Callable
, aby sme ju mohli spustiť na pozadí a zároveň vrátiť výsledok (typu Long
, veľkosti adresárov sú totiž naozaj longové
)
import java.io.File;
import java.nio.file.Files;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FolderSizeCounter implements Callable<Long> {
private static final Logger logger = LoggerFactory.getLogger(FolderSizeCounter.class);
private File folder;
public FolderSizeCounter(File folder) {
this.folder = folder;
}
public Long call() throws Exception {
logger.info("Counting " + folder);
SizeSummingFileVisitor visitor = new SizeSummingFileVisitor();
Files.walkFileTree(folder.toPath(), visitor);
return visitor.getTotalSize();
}
}
Asynchrónne rátanie
Filozofia asynchrónneho rátania a vyberania výsledkov začne vytvorením objektu ExecutorCompletionService
, do ktorého budeme posielať výsledky. No dobre, to nie je úplne prvý krok, pretože táto služba nefunguje sama od seba. Potrebuje získať thread pool (zásobáreň vlákien ;-)), ktorý bude poskytovať vlákna pre paralelné vykonávanie úloh. A ako vieme, thread pooly sú reprezentované implementáciami interfejsu ExecutorService
.
ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Long> executorCompletionService = new ExecutorCompletionService<Long>(threadPool);
Ak máme executorCompletionService
, môžeme doňho radostne odosielať úlohy vykonateľné na pozadí, teda inštanciue našich Callable
objektov. Výpočet veľkosti jedného adresára môžeme zaslať nasledovne:
executorCompletionService.submit(new FolderSizeCounter(new File(folderName)));
Potom, čo zašleme všetky úlohy, môžeme thread pool uzavrieť cez threadPool.shutdown()
. Ak by sme to nespravili, stále by čakal na prípadné budúce úlohy, a aplikácia by „odvisla“.
Vyzdvihnutie úloh
Na začiatku som spomínal, že ExecutorCompletionService
umožňuje vyťahovať výsledky úloh ihneď, ako sú k dispozícii. Metóda take()
blokuje dovtedy, kým sa nedokončí niektorá z úloh, a kým sa nevráti výsledok. Návratovou hodnotou je síce Future
, ale samotný výsledok možno následne okamžite vytiahnuť cez Future#get()
.
Future<Long> futureSize = executorCompletionService.take();
long size = futureSize.get();
Takýmto spôsobom možno postupne vyťahovať toľko výsledkov, koľko sa ich priebežne vypočítava, a pri volaní take()
sa blokuje vždy len dovtedy, kým ExecutorCompletionService
nedopočíta ďalší výsledok.
Nižšie je ukázaný celý kód fungujúci na štyroch adresároch a thread poole s dvomi vláknami.
09:57:10.881 [pool-1-thread-1] INFO s.u.i.n.akka.FolderSizeCounter - Counting /opt
09:57:10.881 [pool-1-thread-2] INFO s.u.i.n.akka.FolderSizeCounter - Counting /tmp
09:57:10.897 [pool-1-thread-1] INFO s.u.i.n.akka.FolderSizeCounter - Counting /etc
09:57:10.897 [main] INFO s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 18811567
09:57:11.126 [pool-1-thread-1] INFO s.u.i.n.akka.FolderSizeCounter - Counting /home
09:57:11.126 [main] INFO s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 699449201
09:57:11.204 [main] INFO s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 273276553
09:57:12.499 [main] INFO s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 7904243633
Total size 8895780954
Všimnite si, ako sa čiastočné výsledky objavujú skôr, než sa dopočítajú výsledky.
Celý kód
import java.io.File;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JavaFutureRunner {
private static final Logger logger = LoggerFactory.getLogger(JavaFutureRunner.class);
public static void main(String[] args) throws Exception {
String[] folderNames = { "/opt", "/tmp", "/etc", "/home" };
ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Long> executorCompletionService = new ExecutorCompletionService<Long>(threadPool);
for (String folderName : folderNames) {
executorCompletionService.submit(new FolderSizeCounter(new File(folderName)));
}
threadPool.shutdown();
long totalSize = 0;
for (int i = 0; i < folderNames.length; i++) {
Future<Long> futureSize = executorCompletionService.take();
long size = futureSize.get();
logger.info("Partial size: " + size);
totalSize += size;
}
System.out.println("Total size " + totalSize);
}
}