Tuesday, September 16, 2014

Java 8'in Çok Çekirdekli Programlama Başarımı

Java 8'in, Mart 2014'de duyurulmasının ardından 6 ay geçti. Şu an JDK'nın en güncel sürümü 8u20. Artık platform olgunlaşmaya başladı. Başarımı test etmenin tam zamanı. Java 8 ile pek çok yenilik ile tanıştık. Detaylar için bu makaleye göz atmanızda fayda var. Java 8'deki yeniliklerin ortak hedefi, çok çekirdekli platformlarda yüksek başarım elde etmek. Uygulama başarımı tek boyutlu bir büyüklük olarak ölçülemez. Birim zamanda çıkarılan iş, yanıt süresi, bellek ayak izi gibi birden fazla ölçüte bakmak gerekebilir. Üstelik bu ölçütler arasında bir ödünleşim (=trade-off) de vardır. Hepsini birden iyileştirmek mümkün olmayabilir.  Bu yazıda, Java 8 platformu üzerinde veri paralelliği farklı yöntemlerle gerçeklenecek ve karşılaştırması yapılacaktır. Temel olarak üç farklı yöntem izlenecektir:
I. Callable İplikler (=Threads), Future ve Executor Kullanımı
Runnable ipliklerin en kötü yönü run() metodunun herhangi bir parametre alamaması ve ipliğin bir değer döndürememesidir. Bu yüzden görevi yaratan iplik ile sonucu üretecek iplik arasında mutlaka bir senkronizasyon kurmak gerekir. Java SE 5 ile gelen Callable arayüzü ile artık sonucu call() metodundan dönmek ve Future arayüzünün get() çağrısı ile bu sonuca asenkron bir şekilde ulaşmak mümkündür. Java SE 5 ile gelen yeniliklerden biri de İplik havuzudur (=Thread Pool). Farklı havuz türleri ihtiyaca göre seçilebilir: Her zaman  sabit sayıda ipliğin yer aldığı Fixed Thread Pool, Parlamalı çalışma modu için Cached Thread Pool, Yığın işler için Single Thread Pool, Periyodik işler için Scheduled Thread Pool. Ancak buradaki tüm yapılar alt düzeydedir. Bu yüzden bu yapı üzerinde uygulama geliştirmek vakit alır, test etmesi zordur, yarış durumları (=race conditions) ve ölümcül kilitlenme (=deadlock) gibi durumların iyi düşünülmüş olması gerekir.
II. Fork/Join Çatısısnın (=Framework) Kullanımı
Java SE 7 ile gelen Fork/Join çatısı temelde İş Çalma (=Work Stealing) algoritmasına dayanır. Java 7'deki Fork/Join çatısı kullanımına ilişkin detayları bu yazıdan okuyabilirsiniz. Fork/Join çatısı I'e göre kodlaması daha kolay olsa da hala alt düzey bir API sunmaktadır. Üstelik problemi iki alt parçaya ayırmak ile seri olarak çözümü arasında genellikle veri boyutu üzerinden verilmesi gereken bir karar vardır.
III. Dönüştür-İndirge (=Map-Reduce) Çatısının Kullanımı
Dönüştür-İndirge çatısı Java SE 8 ile gelmiştir ve alt tarafta Fork/Join çatısını kullanır. Collection API ile hazır olarak gelen paralel kaplar ve Dönüştür-İndirge çerçevesi kullanılarak, çekirdek sayısına göre ölçeklenebilir çözümlere hızlıca ulaşabilir.

Örnek Problemin Tanımı

Karşılaştırmada seçilen problem uzayını tanıyalım:
public class Programmer implements Serializable {

    private int id;
    private String name;
    private String surname;
    private int age;
    private ProgrammingLanguage programmingLanguage;
    . . .
}

public class ProgrammingLanguage implements Serializable {

    private String name;
    private int level;
    . . .
}

Önce 60 milyon Programmer sınıfından rastgele nesne yaratıp bir ArrayList'in içine atıyoruz. Amacımız bu listenin içinden 40 yaşın üstünde "en bilgili" Java programcısını bulmak. Bunun için yukarıda tanımladığımız üç yaklaşımı da kullanacağız ve süreleri karşılaştıracağız.

Seri Çözüm

Çözümün doğruluğunu sınamak ve hızlanmayı karşılaştırabilmek için problemi seri olarak çözmek uygun olur:

private static long serialSolve(List<Programmer> list) {
        long start = System.nanoTime();
        Programmer oldest = null;
        for (Programmer programmer : list) {
            if (programmer.getAge() > 40 && programmer.getProgrammingLanguage().getName().equalsIgnoreCase("java")) {
                if (oldest == null) {
                    oldest = programmer;
                } else if (programmer.getProgrammingLanguage().getLevel() > oldest.getProgrammingLanguage().getLevel()) {
                    oldest = programmer;
                }
            }
        }
        System.err.println("Oldest [Serial]: " + oldest);
        long stop = System.nanoTime();
        return (stop - start);
    }

Callable İplik ve Future Kullanımı

private static long parallelSolveCallableThread(List<Programmer> list) throws InterruptedException {
        long start = System.nanoTime();
        Programmer oldest = null;
        int numberOfSegments = DATA_SIZE / SERIAL_THRESHOLD;
        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        List<Callable<Programmer>> callables = new ArrayList<>(numberOfSegments);
        for (int i = 0, j = 0; i < numberOfSegments; ++i, j += SERIAL_THRESHOLD) {
            callables.add(new ProcessingThread(j, SERIAL_THRESHOLD, list));
        }
        List<Future<Programmer>> partialSolutions = es.invokeAll(callables);

        try {
            oldest = partialSolutions.get(0).get();
            for (int i = 1; i < partialSolutions.size(); ++i) {
                Programmer programmer = partialSolutions.get(i).get();
                if (programmer.getProgrammingLanguage().getLevel() > oldest.getProgrammingLanguage().getLevel()) {
                    oldest = programmer;
                }
            }
        } catch (InterruptedException | ExecutionException ex) {
            Logger.getLogger(PerformanceTest.class.getName()).log(Level.SEVERE, null, ex);
        }
        System.err.println("Oldest [Callable Thread]: " + oldest);
        long stop = System.nanoTime();
        es.shutdown();
        return (stop - start);
    }

public class ProcessingThread implements Callable<Programmer> {

    private final int start;
    private final int length;
    private final List<Programmer> list;

    public ProcessingThread(int start, int length, List<Programmer> list) {
        this.start = start;
        this.length = length;
        this.list = list;
    }

    @Override
    public Programmer call() throws Exception {
        Programmer oldest = list.get(start);
        Programmer programmer;
        for (int i = start + 1, j = 1; j < length; ++j, ++i) {
            programmer = list.get(i);
            if (programmer.getAge() > 40 && programmer.getProgrammingLanguage().getName().equalsIgnoreCase("java")) {
                if (programmer.getProgrammingLanguage().getLevel() > oldest.getProgrammingLanguage().getLevel()) {
                    oldest = programmer;
                }
            }
        }
        return oldest;
    }

}

Fork/Join Çatısının Kullanımı

private static long parallelSolveForkJoin(List<Programmer> list) {
        long start = System.nanoTime();
        ForkJoinPool pool = new ForkJoinPool();
        ForkListProcessing flp = new ForkListProcessing(0, list.size(), list);
        Programmer oldest = pool.invoke(flp);
        System.err.println("Oldest [FJ]: " + oldest);
        long stop = System.nanoTime();
        return (stop - start);
    }

public class ForkListProcessing extends RecursiveTask<Programmer> {

    private final int start;
    private final int length;
    private final List<Programmer> list;
    private static final int SERIAL_THRESHOLD = 5_000_000;

    public ForkListProcessing(int start, int length, List<Programmer> list) {
        this.start = start;
        this.length = length;
        this.list = list;
    }

    @Override
    public Programmer compute() {
        if (length <= SERIAL_THRESHOLD) {
            return serialSolver();
        } else {
            int startLeft = start;
            int lengthLeft = length / 2;
            int startRight = start + lengthLeft;
            int lengthRight = length - lengthLeft;
            RecursiveTask<Programmer> left = new ForkListProcessing(startLeft, lengthLeft, list);
            RecursiveTask<Programmer> right = new ForkListProcessing(startRight, lengthRight, list);
            left.fork();
            right.fork();
            Programmer leftSolution = left.join();
            Programmer rightSolution = right.join();
            if (leftSolution.getProgrammingLanguage().getLevel() >= rightSolution.getProgrammingLanguage().getLevel()) {
                return leftSolution;
            } else {
                return rightSolution;
            }
        }
    }

    private Programmer serialSolver() {
        Programmer oldest = list.get(start);
        Programmer programmer;
        for (int i = start + 1, j = 1; j < length; ++j, ++i) {
            programmer = list.get(i);
            if (programmer.getAge() > 40 && programmer.getProgrammingLanguage().getName().equalsIgnoreCase("java")) {
                if (programmer.getProgrammingLanguage().getLevel() > oldest.getProgrammingLanguage().getLevel()) {
                    oldest = programmer;
                }
            }
        }
        return oldest;
    }

}


Dönüştür-İndirge Çatısının Kullanımı

private static long parallelSolveMapReduce(List<Programmer> list) {
        long start = System.nanoTime();
        Programmer oldest = list.parallelStream()
                .filter((Programmer prg) -> prg.getAge() > 40 && prg.getProgrammingLanguage().getName().equalsIgnoreCase("java"))
                .reduce((left, right) -> left.getProgrammingLanguage().getLevel() >= right.getProgrammingLanguage().getLevel() ? left : right).get();

        System.err.println("Oldest [MapReduce]: " + oldest);
        long stop = System.nanoTime();
        return (stop - start);
    }

Deneysel Sonuçlar

Örnek uygulama 4 fiziksel çekirdeğin ve 8 sanal işlemcinin olduğu i7 işlemcili bir dizüstü bilgisayarda (Asus N56VZ-S4255H), Hotspot Java Sanal Makinası üzerinde koşturulmuştur. Kullanılan Hotspot parametreleri aşağıdaki gibidir: -Xms5096m -Xmx6096m -XX:+UseG1GC -Xcomp.
Sürelerin yer aldığı tabloyu aşağıda izleyebilirsiniz:
Serial Fork/Join Map-Reduce Callable Thread
2668505068 287335266 2944504598 371450954
1520665434 256402330 220530644 297969676
1466104840 248664714 216625912 344167024
1464082558 253262636 218372802 298162542
1516772250 255020642 239981168 302813920
1524937500 248614678 214424022 298976328
1466208756 248644184 213896322 371450954
1465564738 250441532 213332274 299036198
1478366830 384069164 211912956 299329982
1459489336 248061746 211580254 295550550
Tablodaki değerler yöntemin aldığı süreyi nano saniye biriminde vermektedir. Her bir satır yinelemeyi göstermektedir. Her bir yöntem 10 defa yinelenerek tekrar çalıştırılmıştır. Karşılaştırmayı daha kolay yapabilmek için aşağıdaki grafikten  yararlanabilirsiniz:
Aynı uygulamanın i7 işlemcili 2 fiziksel çekirdeği 4 sanal işlemcisi bulunan Samsung ultrabook (NP-900X4C)'daki başarımı aşağıdaki gibi gerçekleşmiştir:
Serial Fork/Join Map-Reduce Callable Thread
19741743117 580408617 4701616707 626018473
1806017710 448612143 496646177 424259928
1848274359 417033783 502213371 484369695
1597769920 462913357 499692308 408970979
1610160940 416298935 486345985 423605953
1606404182 435899678 534527811 426688621
1625213014 408824009 486342701 626018473
1632420684 422062362 576433047 435159082
1680902197 592661288 489187262 724467204
1653464440 430231904 493916565 406734823
Yukarıdaki tabloyu daha kolay yorumlayabilmek için aşağıdaki grafikten yararlanabilirsiniz:

En iyi sonucu 8 sanal işlemcili makinada Map-Reduce ile 4 sanal işlemcili makinada Callable Thread kullanarak elde ediyoruz. Genel olarak sonuçların biri birine çok yakın olduğunu söyleyebiliriz. Map-Reduce'un farkını daha yüksek çekirdek sayılarında görebiliriz. Map-Reduce ile elde edilen en önemli kazanç geliştirme süresinin kısa olması, testinin ve bakımının kolay olmasıdır. Çözüm yaklaşık 10 satırlık bir kod parçasından oluşmaktadır. Burada Callable Thread ve Fork/Join çözümleri özellikle en iyilenmemiştir, ortalama başarımları karşılaştırılmaya çalışılmıştır. Kodun tamamına NetBeans projesi olarak bu adresten ulaşabilirsiniz.
View Binnur Kurt's profile on LinkedIn

No comments:

Post a Comment