Wednesday, August 7, 2013

Linux'da pthread Kütüphanesi ile Çok Çekirdekli Programlama

İşlemcilerin çekirdek sayıları her yeni modelde artmaya devam ediyor. Dolayısı ile eskiden olduğu gibi problemler seri çözülmeye çalışıldığında başarım sınırlı olur, çözüm dikey olarak ölçeklenemez. Çözümü iki çekirdekli bir makinadan 8 çekirdekli bir makinaya taşıdığınızda başarım değişmez. Hangi platform üzerinde ya da hangi programlama dilini kullanarak uygulama geliştiriyor olursanız olun, çözümünüze çok çekirdekli programlamayı tanıştırmanız gerekir. 
Linux platformunda temel geliştirme dili C/C++'dır. Çok parçacıklı programlama için ise Native POSIX Thread (pthread) kütüphanesini kullanıyoruz. pthread bir parçacığı belirli bir çekirdeğe atamak olanağı veriyor. 10 milyon elemanlı bir dizinin elemanları toplamını paralel olarak hesaplayan kodu pthread kütüphanesi kullanarak C++'da kodladım. Aşağıda çözümün kaynak kodunu inceleyebilirsiniz. Kodu derlemek için aşağıdaki komutu çalıştırmanız gerekir:

[student@godel1 mod01]$ g++ parallel_sum.cpp -o parallel_sum -lpthread -pthread
[student@godel1 mod01]$ ./parallel_sum 
Number of virtual cpu's is 2.
Sum of array (Serial) is 45011704
Sum of array (Parallel) is 45011704

Çözümde N adet iş parçacığı kullanılıyor. Bu N adet iş parçacığı int bind_self_to_core(int core_id) fonksiyonu yardımı ile çekirdeklere eşit olarak dağıtılıyor. İş parçacığının yaptığı iş ise void * sum (void *param) fonksiyonunda kodlanmıştır.

parallel_sum.cpp:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <iostream>
#include <pthread.h>
#include <cstdlib>
#include <unistd.h>

using namespace std;

const int NUMBER_OF_CORES =  sysconf(_SC_NPROCESSORS_ONLN);

struct problem {
    int *array;
    int start;
    int size;
} ;

typedef struct problem sproblem;

int bind_self_to_core(int core_id) {
    core_id = core_id % NUMBER_OF_CORES ;

    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(core_id, &cpuset);

    pthread_t current_thread = pthread_self();
    return pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
}

void * sum (void *param) {
  static int counter=0;
  bind_self_to_core(counter++);
  struct problem *p= (struct problem*) param ;
  int *arr= p->array + p->start;
  long partialSum= 0;
  for (int i=0;i<p->size;++i,++arr){
      partialSum += arr[0];
  }
  return (void*) partialSum;
}

const int N = 10;
const int ARRAY_SIZE = 10000000;
const int PROBLEM_SIZE= ARRAY_SIZE/N;

int main () {
    pthread_t *threads= new pthread_t[N];
    struct problem *problem_instances= new struct problem[N];
    int *array= new int[ARRAY_SIZE];
    int *p= array;
    long sumOfArray=0;
    cout << "Number of virtual cpu is " << NUMBER_OF_CORES << "." << endl ;
    for (int i=0;i<ARRAY_SIZE;++i,++p){
        p[0]= rand() % 10 ;
        sumOfArray += p[0];
    }
    cout << "Sum of array (Serial) is " << sumOfArray << endl;
    /* Create n threads, each working with a different array partition. */
    for (int i=0,start=0;i<N;++i,start+=PROBLEM_SIZE){
        problem_instances[i].array= array;
        problem_instances[i].size= PROBLEM_SIZE;
        problem_instances[i].start= start;
        pthread_create (threads+i, NULL, sum, (void *) (problem_instances+i));
    }
    sumOfArray=0;
    for (int i=0;i<N;++i){
        long partialSum;
        pthread_join (threads[i], (void**)&partialSum);
        sumOfArray += partialSum;
    }
    cout << "Sum of array (Parallel) is " << sumOfArray << endl;
    delete[] array;
    delete[] threads;
    delete[] problem_instances;
        return 0;
}