Saturday, December 26, 2015

Semafor

posix semaphore ile ilgili görsel sonucu
Bu yazıda prosesler arası kaynak paylaşımı ve eş zamanlama çözümü için kullanılan semafor çözümlerini inceleyeceğiz. İlk olarak, Unix’deki Inter Process Communication (IPC) alt yapısını kullanarak semafor programlama yapacağız. Daha sonra sırasıyla C++11’de ve Java platformundaki semafor çözümlerini çalışılacağız.
Birden fazla kaynağı iplikler ya da prosesler arasında paylaştırmak gerektiğinde semafor çözümü kullanılır. Her proses semafordan ihtiyaç duyduğu kaynak kadar istekte bulunur. Semafor tam sayı değer saklayan bir nesne olarak düşünülebilir. Bu tam sayının başlangıç değeri kaynak sayısıdır. Semafor üzerinde temel olarak iki işlem tanımlıdır:

  • Azaltma: Semaforun değeri, alınan kaynak sayısı kadar azaltılır. Eğer semaforun değeri, negatif bir değere ulaşırsa, çağrıyı yapan proses ya da iplik istekte bulunduğu sayı kadar kaynak hazır olana kadar kuyrukta bekler.
  • Artırma: Kaynaklar geri verildiğinde semaforun değeri, geri verilen kaynak sayısı kadar arttırılır. Bekleyen proseslerden, kaynak sayısı karşılananlar proses uyandırılır.
Unix işletim sisteminde, çekirdek tüm kaynaklar için tekil bir anahtar kullanır. Proses arası iletişim yapısı için kullanılan kaynaklar için de tekil bir anahtar üretilmesi gerekir. Bunun için ftok() fonksiyonu kullanılır. ftok() fonksiyonuna parametre olarak sistemdeki bir dosyanın tam yolu verilir:
  1 #include <sys/ipc.h>
  2 #include <sys/msg.h>
  3 #include <stdio.h>
  4 
  5 int main(){
  6     key_t key;
  7     int id;
  8     key= ftok("/home/student/keyfile",1);
  9     int flag = (IPC_CREAT | IPC_EXCL | 0400) ;
 10     id= msgget(key,flag);
 11     printf("Queue ID: %d\n",id);
 12     id= shmget(key,4*sizeof(int),flag);
 13     printf("Shared Memory ID: %d\n",id);
 14     id= semget(key,3,flag);
 15     printf("Semaphore ID: %d\n",id);
 16 } 
Bu kodu aşağıdaki gibi derleyip çalıştırıyoruz:
[student@godel]$ cc -o ftok ftok.c
[student@godel]$ ./ftok
Queue ID: 196608
Shared Memory ID: 88276993
Semaphore ID: 32768
Bize ürettiği tekil anahtarın değeri mesaj kuyruğu için 196608, paylaşılan bellek için 88276993 ve semafor için ise 32768’dir. Üretilen tekil anahtarları, kaynak türlerine göre listelemek için ipcs komutundan yararlanıyoruz:
[student@godel]$ ipcs

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status     
0x00000000 1703936    student    600        393216     2          dest        
0x010159a1 88276993   student    400        16         0                      

------ Semaphore Arrays --------
key        semid      owner      perms      nsems    
0x010159a1 32768      student    400        3        

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages   
0x010159a1 196608     student    400        0            0 
Tekil anahtar üretirken her üç kaynak türü için de aynı dosya yolunu kullandığımıza dikkat edin. Normalde farklı kaynak türleri farklı dosya yolu kullanılır. Anahtarı silmek için ise ipcrm komutunu kullanıyoruz. Komuta parametre olarak kaynağın türünü ve anahtar kimliğini veriyoruz:
[student@godel]$ ipcrm -s 32768
[student@godel]$ ipcrm -m 88276993
[student@godel]$ ipcrm -q 196608
[student@godel]$ ipcs

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status      
0x00000000 1703936    student    600        393216     2          dest        

------ Semaphore Arrays --------
key        semid      owner      perms      nsems    

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
Mesaj kuyruğunu silmek için –q, semafor silmek için –s ve paylaşılan bellek için -m seçeneğini kullanıyoruz.
Üç farklı kaynağın her birinden sırasıyla 3, 5 ve 10 adet bulunan durum için semaforu, POSIX kütüphanesindeki semget ve semctl çağrılarını kullanarak aşağıdaki şekilde yaratıyoruz:

#define NUM_SEMS_IN_GROUP 3
#define RESOURCE1 0
#define RESOURCE2 1
#define RESOURCE3 2

int semid;

int numResource1 = 3;
int numResource2 = 5;
int numResource3 = 10;

semid = semget(
     IPC_PRIVATE,
     NUM_SEMS_IN_GROUP,
     IPC_CREAT | 0600
);

semctl(semid, RESOURCE1, SETVAL, &numResource1);
semctl(semid, RESOURCE2, SETVAL, &numResource2);
semctl(semid, RESOURCE3, SETVAL, &numResource3);

Bu üç farklı kaynaktan sırayla 2, 3 ve 1 adet kaynak almak için ise semop çağrısını kullanıyoruz:

struct sembuf ops[NUM_SEMS_IN_GROUP];
ops[0].sem_num = RESOURCE1;  
ops[0].sem_op = 2;
ops[1].sem_num = RESOURCE2;  
ops[1].sem_op = 3;
ops[2].sem_num = RESOURCE3;  
ops[2].sem_op = 1;

ops[0].sem_flg = ops[1].sem_flg = ops[2].sem_flg = WAIT;
semop(semid, ops, NUM_SEMS_IN_GROUP);

Şimdi POSIX semaforları kullanarak örnek bir problemi çözelim. Ev taşıma hizmeti veren bir firmanın 5 aracı, 12 taşıyıcısı ve 1000 TL’ye kadar sigortalanacak taşıma kapasitesi olsun. Kendisine gelen farklı taşıma istekleri için bu kaynakların paylaşımını semafor kullanarak çözelim:  
Araç 
Sayısı
Taşıyıcı Sayısı
Sigortalama 
Değeri
4
5
250
1
2
500
3
5
1000
2
8
250
Çözümün kaynak kodunu aşağıda bulabilirsiniz:
#include <sys/types.h> 
#include <sys/ipc.h>
#include <sys/sem.h>
#include <pthread.h>
#include <stdlib.h> 
#include <stdio.h>

#include "utils.h"

#define NUM_THREADS 10
#define TIME_BTWN_NEW_THREADS 0.5
#define RUNTIME_RANGE 5.0

#define NUM_SEMS_IN_GROUP 3
#define TRUCK_SEM 0
#define MOVER_SEM 1
#define INSUR_SEM 2

#define NUM_TRUCKS 5
#define NUM_MOVERS 12
#define AMT_INSUR 1000

#define WAIT 0

#define STRING_SIZE 80

#define FALSE 0
#define TRUE (!FALSE)

#define STDOUT_FD 1

struct job {
    int numTrucks;
    int numMovers;
    int amtInsurance;
} jobTable[] = {
    { 4, 5, 250},
    { 1, 2, 500},
    { 3, 5, 1000},
    { 2, 8, 250},
};

int numJobs = sizeof (jobTable) / sizeof (struct job);
int semid;
extern int errno;
void *threadMain(void *);

main() {
    pthread_t threads[NUM_THREADS]; 

    int numTrucks = NUM_TRUCKS;
    int numMovers = NUM_MOVERS;
    int amtInsur = AMT_INSUR;

    int count;

    if ((semid = semget(IPC_PRIVATE, NUM_SEMS_IN_GROUP, IPC_CREAT | 0600)) == -1) {
        perror("semget");
        exit(errno);
    }

    if ((semctl(semid, TRUCK_SEM, SETVAL, &numTrucks)) ||
        (semctl(semid, MOVER_SEM, SETVAL, &numMovers)) ||
        (semctl(semid, INSUR_SEM, SETVAL, &amtInsur))) {
        perror("Error initializing semaphores");
        goto cleanup;
    }

    srand48(time(NULL));

    for (count = 0; count < NUM_THREADS; count++) {
        if (pthread_create(&threads[count], NULL, threadMain, (void *) (count % numJobs))){
            perror("Error starting reader threads");
            goto cleanup;
        }
        fractSleep(TIME_BTWN_NEW_THREADS);
    }

    for (count = 0; count < NUM_THREADS; count++) {
        pthread_join(threads[count], (void **) NULL);
    }

cleanup:
    if (semctl(semid, 0, IPC_RMID, NULL)) {
        perror("semctl IPC_RMID:");
    }
}

void *threadMain(void * arg) {
    int jobNum = (int) arg;
    char string[STRING_SIZE];

    sprintf(string,
            "Job # %d requesting %d trucks, %d people, $%d000 insurance...\n",
            jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers,
            jobTable[jobNum].amtInsurance);
    printWithTime(string);

    if (reserve(semid, jobTable[jobNum])) {
        perror("reserve");
        return (NULL);
    }

    sprintf(string,
            "Job # %d got %d trucks, %d people, %d000 insurance and is running...\n",
            jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers,
            jobTable[jobNum].amtInsurance);
    printWithTime(string);

    fractSleep(drand48() * RUNTIME_RANGE);

    sprintf(string,
            "Job # %d done;  returning %d trucks, %d people, %d000 insurance...\n",
            jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers,
            jobTable[jobNum].amtInsurance);
    printWithTime(string);

    if (release(semid, jobTable[jobNum])) {
        perror("release");
    }
}

int release(int semid, struct job thisJob) {
    return (playWithSemaphores(
            semid, thisJob.numTrucks, thisJob.numMovers, thisJob.amtInsurance));
}

int reserve(int semid, struct job thisJob) {
    return (playWithSemaphores(
            semid, -thisJob.numTrucks, -thisJob.numMovers, -thisJob.amtInsurance));
}

int playWithSemaphores(int semid, int numTrucks, int numMovers, int amtInsurance) {

    struct sembuf ops[NUM_SEMS_IN_GROUP];

    ops[0].sem_num = TRUCK_SEM;
    ops[0].sem_op = numTrucks;
    ops[1].sem_num = MOVER_SEM;
    ops[1].sem_op = numMovers;
    ops[2].sem_num = INSUR_SEM;
    ops[2].sem_op = amtInsurance;

    ops[0].sem_flg = ops[1].sem_flg = ops[2].sem_flg = WAIT;

    return (semop(semid, ops, NUM_SEMS_IN_GROUP));
}
C++11’de iplik programlama ve eş zamanlama ile ilgili hazır çözümler gelmiş olsa da semafor çözümünü içermez. Ancak semafor mekanizmasını mutex sınıfını kullanarak gerçekleştirebiliriz:
class semaphore {
  bool hasResource(initializer_list<int> values){
 int i=0;
 for (auto & value : values)
        if(resources[i++]<value) return false; 
 cerr << "Obtained the resources!" << endl ; 
 return true;  
  }
public:

  semaphore(initializer_list<int> inits) {
      int i=0;
      for (auto & init : inits)
          resources.push_back(init); 
  }

  void release(initializer_list<int> values){
    unique_lock<mutex> lck(mtx);
    int i=0;
    for (auto & value : values)
        resources[i++] += value;     
    cv.notify_one();
  }

  void acquire(initializer_list<int> values){
    unique_lock<mutex> lck(mtx);
 
    while(!hasResource(values)){
      cv.wait(lck);
    }
    int i=0;
    for (auto & value : values)
        resources[i++] -= value;     
  }

  mutex mtx;
  condition_variable cv;
  vector<int> resources;
};
POSIX semaforları kullanarak çözdüğümüz problemi şimdi yukarıda oluşturduğumuz semaphore sınıfını kullanarak çözmeye çalışalım:
int jobs[][3= {
{ 4, 5, 250 }, 
  { 1, 2, 500 }, 
  { 3, 5, 1000 },
  { 2, 8, 250 },
};

int main(){
  semaphore sem({5, 12, 1000});
  vector<thread> tasks;
  auto run= [&sem] (int i,int job[3] ) {
    sem.acquire({job[0],job[1],job[2]});
    sem.release({job[0],job[1],job[2]});
  };
  int i=0;
  for (auto &job : jobs){
    tasks.push_back({thread(run,i,job)});
    ++i;
  }
  for (auto& task: tasks)
    task.join();
  return 0;
}
Java’da semafor programlama için Java SE 5 ile gelen java.util.concurrent paketinde yer alan Semaphore sınıfını kullanıyoruz. Semaphore sınıfını kullanarak semaforu yaratırken kaynak sayısını veriyoruz. Semaphore sınıfında kaynak almak için acquire ve alınan kaynağı geri vermek için release metodları bulunuyor. acquire metodu eğer yeterli sayıda kaynak yok ise, çağrıyı yapan ipliği, diğer iplikler release metodu ile yeterli sayıda kaynak bırakıncaya kadar bir kuyrukta bloke eder. Yukarıda, önce POSIX semaforlarını, daha sonra C++11 kullanarak çözdüğümüz problemi, bu kez Java’daki Semaphore sınıfını kullanarak tekrar çözelim:
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TransportationProblem {
   static int jobs[][] = { 
            { 4, 5, 250 }, 
            { 1, 2, 500 }, 
            { 3, 5, 1000 }, 
            { 2, 8, 250 } 
       };
   public static void main(String[] args) throws InterruptedException {
     Thread[] tasks = new Thread[jobs.length];
     MultiResource mr = new MultiResource(5, 12, 1000);
     for (int i = 0; i < tasks.length; ++i) {
      tasks[i] = new Thread(new Task(mr, jobs[i]),Arrays.toString(jobs[i]));
     }
     for (Thread task : tasks)
        task.start();
     for (Thread task : tasks)
        task.join();
  }

}

class Task implements Runnable {
   int[] job;
   MultiResource mr;

   public Task(MultiResource mr, int[] job) {
      this.mr = mr;
      this.job = job;
   }

   @Override
   public void run() {
     try {
        mr.acquire(job);
        System.err.println(Thread.currentThread().getName() 
               + " is using the resource " + Arrays.toString(job) + "...");
        mr.release(job);
        System.err.println(Thread.currentThread().getName() 
               + " has relased the resource " + Arrays.toString(job) + "!");
     } catch (InterruptedException e) {
       e.printStackTrace();
     }

  }

}
class MultiResource {
   private volatile Semaphore[] semaphores;

   public MultiResource(int... permits) {
      System.err.println("Initializing the semaphores with permits " 
                             + Arrays.toString(permits));
      semaphores = new Semaphore[permits.length];
      for (int i = 0; i < semaphores.length; ++i) {
        semaphores[i] = new Semaphore(permits[i], true);
      }
   }

   public void acquire(int... permits) throws InterruptedException {
      System.err.println(Thread.currentThread().getName() 
              + " is acquiring the resources " + Arrays.toString(permits));
      for (int i = 0; i < permits.length; ++i) {
          System.err.println(Thread.currentThread().getName() 
              + " acquired the resource(" + (i + 1) + ") " + permits[i]);
          semaphores[i].acquire(permits[i]);
      }
      System.err.println(Thread.currentThread().getName() 
              + " acquired the resources " + Arrays.toString(permits)
       + "...finally!");
   }

   public void release(int... permits) throws InterruptedException {
      System.err.println(Thread.currentThread().getName() 
                + " is releasing " + Arrays.toString(permits) + "...");
      for (int i = 0; i < permits.length; ++i) {
          semaphores[i].release(permits[i]);
      }
      System.err.println(Thread.currentThread().getName() + 
             " is releasing " + Arrays.toString(permits) + "...done.");
   }
}

No comments:

Post a Comment