Počítanie veľkostí adresárov s `ExecutorCompletionService`

V Jave vieme ľahko riešiť paralelné úlohy, to je jasné. Vieme však pohodlne programovať úlohy, kde spustíme paralelne viacero úloh, a výsledky z nich spracovávame postupne v takom poradí, v akom sa ich darí dokončovať.

Ukážme si to na príklade spočítavania súhrnnej veľkostí adresárov (a to rekurzívne). Paralelne začneme spočítavať, povedzme, štyri adresáre na troch vláknach, a uvidíme, že menšie adresáre sa spracujú rýchlejšie. Ak by sme použili napríklad ExecutorService#invokeAll(), sčítavanie veľkostí adresárov by sa mohlo spustiť až po dobehnutí všetkých úloh.

Namiesto toho môžeme použiť ExecutorCompletionService, ktorý dokáže vykonávať úlohy paralelne, ale zároveň postupne spracovávať výsledky.

Na úlohu budeme potrebovať:

  • objekt, ktorý asynchrónne zráta veľkosť jedného adresára.
  • pomocný objekt, visitor, ktorý bude navštevovať súbory a adresáre a sčítavať ich veľkosti
  • triedu, v ktorej použijeme konkurentnú mašinériu

Zrátavenie veľkostí adresárov

V Jave 7 už nemusíme rekurzívne behať po adresároch. Máme API Files#walkFileTree(), ktoré pobehá po adresárovej štruktúre, a bude od nás očakávať implementáciu objektu, ktorého metóda sa zavolá pre každý adresár či súbor. (Áno, je to návrhový vzor visitor v praxi).

Ukážka použitia?

SizeSummingFileVisitor visitor = new SizeSummingFileVisitor();
Files.walkFileTree(folder.toPath(), visitor);

Visitor

SizeSummingFileVisitor je naša trieda, ktorá vyzerá nasledovne:

package sk.upjs.ics.novotnyr.akka;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;

public class SizeSummingFileVisitor extends SimpleFileVisitor<Path> {
    private long totalSize;

    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
        if(attrs.isRegularFile()) {
            totalSize += attrs.size();
        }

        return super.visitFile(file, attrs);
    }

    public long getTotalSize() {
        return totalSize;
    }
}

Pre každý nájdený súbor začneme spočítavať celkovú veľkosť, a to len v prípade, že ide naozaj o regulárny súbor.

Úloha pre spočítavanie

Samotná úloha, ktorá spočíta veľkosť v duchu du -S, bude implementovať interfejs Callable, aby sme ju mohli spustiť na pozadí a zároveň vrátiť výsledok (typu Long, veľkosti adresárov sú totiž naozaj longové)

import java.io.File;
import java.nio.file.Files;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FolderSizeCounter implements Callable<Long> {

    private static final Logger logger = LoggerFactory.getLogger(FolderSizeCounter.class);

    private File folder;

    public FolderSizeCounter(File folder) {
        this.folder = folder;
    }

    public Long call() throws Exception {
        logger.info("Counting " + folder);

        SizeSummingFileVisitor visitor = new SizeSummingFileVisitor();
        Files.walkFileTree(folder.toPath(), visitor);

        return visitor.getTotalSize();
    }

}

Asynchrónne rátanie

Filozofia asynchrónneho rátania a vyberania výsledkov začne vytvorením objektu ExecutorCompletionService, do ktorého budeme posielať výsledky. No dobre, to nie je úplne prvý krok, pretože táto služba nefunguje sama od seba. Potrebuje získať thread pool (zásobáreň vlákien ;-)), ktorý bude poskytovať vlákna pre paralelné vykonávanie úloh. A ako vieme, thread pooly sú reprezentované implementáciami interfejsu ExecutorService.

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Long> executorCompletionService = new ExecutorCompletionService<Long>(threadPool);

Ak máme executorCompletionService, môžeme doňho radostne odosielať úlohy vykonateľné na pozadí, teda inštanciue našich Callable objektov. Výpočet veľkosti jedného adresára môžeme zaslať nasledovne:

executorCompletionService.submit(new FolderSizeCounter(new File(folderName)));

Potom, čo zašleme všetky úlohy, môžeme thread pool uzavrieť cez threadPool.shutdown(). Ak by sme to nespravili, stále by čakal na prípadné budúce úlohy, a aplikácia by „odvisla“.

Vyzdvihnutie úloh

Na začiatku som spomínal, že ExecutorCompletionService umožňuje vyťahovať výsledky úloh ihneď, ako sú k dispozícii. Metóda take() blokuje dovtedy, kým sa nedokončí niektorá z úloh, a kým sa nevráti výsledok. Návratovou hodnotou je síce Future, ale samotný výsledok možno následne okamžite vytiahnuť cez Future#get().

Future<Long> futureSize = executorCompletionService.take();
long size = futureSize.get();

Takýmto spôsobom možno postupne vyťahovať toľko výsledkov, koľko sa ich priebežne vypočítava, a pri volaní take() sa blokuje vždy len dovtedy, kým ExecutorCompletionService nedopočíta ďalší výsledok.

Nižšie je ukázaný celý kód fungujúci na štyroch adresároch a thread poole s dvomi vláknami.

09:57:10.881 [pool-1-thread-1] INFO  s.u.i.n.akka.FolderSizeCounter - Counting /opt
09:57:10.881 [pool-1-thread-2] INFO  s.u.i.n.akka.FolderSizeCounter - Counting /tmp
09:57:10.897 [pool-1-thread-1] INFO  s.u.i.n.akka.FolderSizeCounter - Counting /etc
09:57:10.897 [main] INFO  s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 18811567
09:57:11.126 [pool-1-thread-1] INFO  s.u.i.n.akka.FolderSizeCounter - Counting /home
09:57:11.126 [main] INFO  s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 699449201
09:57:11.204 [main] INFO  s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 273276553
09:57:12.499 [main] INFO  s.u.i.novotnyr.akka.JavaFutureRunner - Partial size: 7904243633
Total size 8895780954

Všimnite si, ako sa čiastočné výsledky objavujú skôr, než sa dopočítajú výsledky.

Celý kód

import java.io.File;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaFutureRunner {

    private static final Logger logger = LoggerFactory.getLogger(JavaFutureRunner.class);

    public static void main(String[] args) throws Exception {
        String[] folderNames = { "/opt", "/tmp", "/etc", "/home" };

        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        ExecutorCompletionService<Long> executorCompletionService = new ExecutorCompletionService<Long>(threadPool);
        for (String folderName : folderNames) {
            executorCompletionService.submit(new FolderSizeCounter(new File(folderName)));
        }
        threadPool.shutdown();

        long totalSize = 0;
        for (int i = 0; i < folderNames.length; i++) {
            Future<Long> futureSize = executorCompletionService.take();
            long size = futureSize.get();

            logger.info("Partial size: " + size);

            totalSize += size;
        }

        System.out.println("Total size " + totalSize);
    }
}

Pridaj komentár

Vaša e-mailová adresa nebude zverejnená. Vyžadované polia sú označené *