Birden fazla kaynağı iplikler ya da prosesler arasında paylaştırmak gerektiğinde semafor çözümü kullanılır. Her proses semafordan ihtiyaç duyduğu kaynak kadar istekte bulunur. Semafor tam sayı değer saklayan bir nesne olarak düşünülebilir. Bu tam sayının başlangıç değeri kaynak sayısıdır. Semafor üzerinde temel olarak iki işlem tanımlıdır:
- Azaltma: Semaforun değeri, alınan kaynak sayısı kadar azaltılır. Eğer semaforun değeri, negatif bir değere ulaşırsa, çağrıyı yapan proses ya da iplik istekte bulunduğu sayı kadar kaynak hazır olana kadar kuyrukta bekler.
- Artırma: Kaynaklar geri verildiğinde semaforun değeri, geri verilen kaynak sayısı kadar arttırılır. Bekleyen proseslerden, kaynak sayısı karşılananlar proses uyandırılır.
1 #include <sys/ipc.h> 2 #include <sys/msg.h> 3 #include <stdio.h> 4 5 int main(){ 6 key_t key; 7 int id; 8 key= ftok("/home/student/keyfile",1); 9 int flag = (IPC_CREAT | IPC_EXCL | 0400) ; 10 id= msgget(key,flag); 11 printf("Queue ID: %d\n",id); 12 id= shmget(key,4*sizeof(int),flag); 13 printf("Shared Memory ID: %d\n",id); 14 id= semget(key,3,flag); 15 printf("Semaphore ID: %d\n",id); 16 }
[student@godel]$ cc -o ftok ftok.c [student@godel]$ ./ftok Queue ID: 196608 Shared Memory ID: 88276993 Semaphore ID: 32768
[student@godel]$ ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 1703936 student 600 393216 2 dest 0x010159a1 88276993 student 400 16 0 ------ Semaphore Arrays -------- key semid owner perms nsems 0x010159a1 32768 student 400 3 ------ Message Queues -------- key msqid owner perms used-bytes messages 0x010159a1 196608 student 400 0 0
[student@godel]$ ipcrm -s 32768 [student@godel]$ ipcrm -m 88276993 [student@godel]$ ipcrm -q 196608 [student@godel]$ ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 1703936 student 600 393216 2 dest ------ Semaphore Arrays -------- key semid owner perms nsems ------ Message Queues -------- key msqid owner perms used-bytes messages
Üç farklı kaynağın her birinden sırasıyla 3, 5 ve 10 adet bulunan durum için semaforu, POSIX kütüphanesindeki semget ve semctl çağrılarını kullanarak aşağıdaki şekilde yaratıyoruz:
#define NUM_SEMS_IN_GROUP 3
#define RESOURCE1 0
#define RESOURCE2 1
#define RESOURCE3 2
int semid;
int numResource1 = 3;
int numResource2 = 5;
int numResource3 = 10;
semid = semget(
IPC_PRIVATE,
NUM_SEMS_IN_GROUP,
IPC_CREAT | 0600
);
semctl(semid, RESOURCE1, SETVAL, &numResource1);
semctl(semid, RESOURCE2, SETVAL, &numResource2);
semctl(semid, RESOURCE3, SETVAL, &numResource3);
Bu üç farklı kaynaktan sırayla 2, 3 ve 1 adet kaynak almak için ise semop çağrısını kullanıyoruz:
struct sembuf ops[NUM_SEMS_IN_GROUP];
ops[0].sem_num = RESOURCE1;
ops[0].sem_op = 2;
ops[1].sem_num = RESOURCE2;
ops[1].sem_op = 3;
ops[2].sem_num = RESOURCE3;
ops[2].sem_op = 1;
ops[0].sem_flg = ops[1].sem_flg = ops[2].sem_flg = WAIT;
semop(semid, ops, NUM_SEMS_IN_GROUP);
Şimdi POSIX semaforları kullanarak örnek bir problemi çözelim. Ev taşıma hizmeti veren bir firmanın 5 aracı, 12 taşıyıcısı ve 1000 TL’ye kadar sigortalanacak taşıma kapasitesi olsun. Kendisine gelen farklı taşıma istekleri için bu kaynakların paylaşımını semafor kullanarak çözelim:
Çözümün kaynak kodunu aşağıda bulabilirsiniz:
Araç
Sayısı
|
Taşıyıcı Sayısı
|
Sigortalama
Değeri
|
4
|
5
|
250
|
1
|
2
|
500
|
3
|
5
|
1000
|
2
|
8
|
250
|
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include "utils.h" #define NUM_THREADS 10 #define TIME_BTWN_NEW_THREADS 0.5 #define RUNTIME_RANGE 5.0 #define NUM_SEMS_IN_GROUP 3 #define TRUCK_SEM 0 #define MOVER_SEM 1 #define INSUR_SEM 2 #define NUM_TRUCKS 5 #define NUM_MOVERS 12 #define AMT_INSUR 1000 #define WAIT 0 #define STRING_SIZE 80 #define FALSE 0 #define TRUE (!FALSE) #define STDOUT_FD 1 struct job { int numTrucks; int numMovers; int amtInsurance; } jobTable[] = { { 4, 5, 250}, { 1, 2, 500}, { 3, 5, 1000}, { 2, 8, 250}, }; int numJobs = sizeof (jobTable) / sizeof (struct job); int semid; extern int errno; void *threadMain(void *); main() { pthread_t threads[NUM_THREADS]; int numTrucks = NUM_TRUCKS; int numMovers = NUM_MOVERS; int amtInsur = AMT_INSUR; int count; if ((semid = semget(IPC_PRIVATE, NUM_SEMS_IN_GROUP, IPC_CREAT | 0600)) == -1) { perror("semget"); exit(errno); } if ((semctl(semid, TRUCK_SEM, SETVAL, &numTrucks)) || (semctl(semid, MOVER_SEM, SETVAL, &numMovers)) || (semctl(semid, INSUR_SEM, SETVAL, &amtInsur))) { perror("Error initializing semaphores"); goto cleanup; } srand48(time(NULL)); for (count = 0; count < NUM_THREADS; count++) { if (pthread_create(&threads[count], NULL, threadMain, (void *) (count % numJobs))){ perror("Error starting reader threads"); goto cleanup; } fractSleep(TIME_BTWN_NEW_THREADS); } for (count = 0; count < NUM_THREADS; count++) { pthread_join(threads[count], (void **) NULL); } cleanup: if (semctl(semid, 0, IPC_RMID, NULL)) { perror("semctl IPC_RMID:"); } } void *threadMain(void * arg) { int jobNum = (int) arg; char string[STRING_SIZE]; sprintf(string, "Job # %d requesting %d trucks, %d people, $%d000 insurance...\n", jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers, jobTable[jobNum].amtInsurance); printWithTime(string); if (reserve(semid, jobTable[jobNum])) { perror("reserve"); return (NULL); } sprintf(string, "Job # %d got %d trucks, %d people, %d000 insurance and is running...\n", jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers, jobTable[jobNum].amtInsurance); printWithTime(string); fractSleep(drand48() * RUNTIME_RANGE); sprintf(string, "Job # %d done; returning %d trucks, %d people, %d000 insurance...\n", jobNum, jobTable[jobNum].numTrucks, jobTable[jobNum].numMovers, jobTable[jobNum].amtInsurance); printWithTime(string); if (release(semid, jobTable[jobNum])) { perror("release"); } } int release(int semid, struct job thisJob) { return (playWithSemaphores( semid, thisJob.numTrucks, thisJob.numMovers, thisJob.amtInsurance)); } int reserve(int semid, struct job thisJob) { return (playWithSemaphores( semid, -thisJob.numTrucks, -thisJob.numMovers, -thisJob.amtInsurance)); } int playWithSemaphores(int semid, int numTrucks, int numMovers, int amtInsurance) { struct sembuf ops[NUM_SEMS_IN_GROUP]; ops[0].sem_num = TRUCK_SEM; ops[0].sem_op = numTrucks; ops[1].sem_num = MOVER_SEM; ops[1].sem_op = numMovers; ops[2].sem_num = INSUR_SEM; ops[2].sem_op = amtInsurance; ops[0].sem_flg = ops[1].sem_flg = ops[2].sem_flg = WAIT; return (semop(semid, ops, NUM_SEMS_IN_GROUP)); }
class semaphore { bool hasResource(initializer_list<int> values){ int i=0; for (auto & value : values) if(resources[i++]<value) return false; cerr << "Obtained the resources!" << endl ; return true; } public: semaphore(initializer_list<int> inits) { int i=0; for (auto & init : inits) resources.push_back(init); } void release(initializer_list<int> values){ unique_lock<mutex> lck(mtx); int i=0; for (auto & value : values) resources[i++] += value; cv.notify_one(); } void acquire(initializer_list<int> values){ unique_lock<mutex> lck(mtx); while(!hasResource(values)){ cv.wait(lck); } int i=0; for (auto & value : values) resources[i++] -= value; } mutex mtx; condition_variable cv; vector<int> resources; };
POSIX semaforları kullanarak çözdüğümüz problemi şimdi yukarıda oluşturduğumuz semaphore sınıfını kullanarak çözmeye çalışalım:
int jobs[][3] = {
{ 4, 5, 250 }, { 1, 2, 500 }, { 3, 5, 1000 }, { 2, 8, 250 }, }; int main(){ semaphore sem({5, 12, 1000}); vector<thread> tasks; auto run= [&sem] (int i,int job[3] ) { sem.acquire({job[0],job[1],job[2]}); sem.release({job[0],job[1],job[2]}); }; int i=0; for (auto &job : jobs){ tasks.push_back({thread(run,i,job)}); ++i; } for (auto& task: tasks) task.join(); return 0; }
Java’da semafor programlama için Java SE 5 ile gelen java.util.concurrent paketinde yer alan Semaphore sınıfını kullanıyoruz. Semaphore sınıfını kullanarak semaforu yaratırken kaynak sayısını veriyoruz. Semaphore sınıfında kaynak almak için acquire ve alınan kaynağı geri vermek için release metodları bulunuyor. acquire metodu eğer yeterli sayıda kaynak yok ise, çağrıyı yapan ipliği, diğer iplikler release metodu ile yeterli sayıda kaynak bırakıncaya kadar bir kuyrukta bloke eder. Yukarıda, önce POSIX semaforlarını, daha sonra C++11 kullanarak çözdüğümüz problemi, bu kez Java’daki Semaphore sınıfını kullanarak tekrar çözelim:
import java.util.Arrays; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class TransportationProblem { static int jobs[][] = { { 4, 5, 250 }, { 1, 2, 500 }, { 3, 5, 1000 }, { 2, 8, 250 } }; public static void main(String[] args) throws InterruptedException { Thread[] tasks = new Thread[jobs.length]; MultiResource mr = new MultiResource(5, 12, 1000); for (int i = 0; i < tasks.length; ++i) { tasks[i] = new Thread(new Task(mr, jobs[i]),Arrays.toString(jobs[i])); } for (Thread task : tasks) task.start(); for (Thread task : tasks) task.join(); } } class Task implements Runnable { int[] job; MultiResource mr; public Task(MultiResource mr, int[] job) { this.mr = mr; this.job = job; } @Override public void run() { try { mr.acquire(job); System.err.println(Thread.currentThread().getName() + " is using the resource " + Arrays.toString(job) + "..."); mr.release(job); System.err.println(Thread.currentThread().getName() + " has relased the resource " + Arrays.toString(job) + "!"); } catch (InterruptedException e) { e.printStackTrace(); } } } class MultiResource { private volatile Semaphore[] semaphores; public MultiResource(int... permits) { System.err.println("Initializing the semaphores with permits " + Arrays.toString(permits)); semaphores = new Semaphore[permits.length]; for (int i = 0; i < semaphores.length; ++i) { semaphores[i] = new Semaphore(permits[i], true); } } public void acquire(int... permits) throws InterruptedException { System.err.println(Thread.currentThread().getName() + " is acquiring the resources " + Arrays.toString(permits)); for (int i = 0; i < permits.length; ++i) { System.err.println(Thread.currentThread().getName() + " acquired the resource(" + (i + 1) + ") " + permits[i]); semaphores[i].acquire(permits[i]); } System.err.println(Thread.currentThread().getName() + " acquired the resources " + Arrays.toString(permits) + "...finally!"); } public void release(int... permits) throws InterruptedException { System.err.println(Thread.currentThread().getName() + " is releasing " + Arrays.toString(permits) + "..."); for (int i = 0; i < permits.length; ++i) { semaphores[i].release(permits[i]); } System.err.println(Thread.currentThread().getName() + " is releasing " + Arrays.toString(permits) + "...done."); } }