-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkers_threads.cpp
146 lines (138 loc) · 5.39 KB
/
workers_threads.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include "workers_threads.hpp"
#include <unistd.h>
#include <iostream>
#include <string>
#include <fstream>
#include <algorithm>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <string.h>
#include <queue>
#include <pthread.h>
#include <unordered_map>
#include "global.hpp"
#include "workers_threads.hpp"
using namespace std;
pthread_t* create_workers(int numworkers){
pthread_t* worker_threads = new pthread_t[numworkers];
for(int i=0;i<numworkers;i++){
if(pthread_create(&worker_threads[i],NULL,worker,NULL)!=0){
perror("thread create");
exit(1);
}
}
return worker_threads;
}
void* worker(void* arg)
{
while (1)
{
pthread_mutex_lock(&bmtx);//lock buffer mutex now worker is using buffer
while(buffer.size()==0){ //while buffer queue is empty
pthread_cond_wait(&buffer_nonempty,&bmtx);
}
int clientfd=buffer.front();//the client fd will be on top of the queue buffer
buffer.pop();//took the client fd now delete it
pthread_cond_signal(&buffer_nonfull);
pthread_mutex_unlock(&bmtx);//release buffer mutex we dont use it now
//check if signal was given
pthread_mutex_lock(&terminateMutex);
if (terminateServer) {
pthread_mutex_unlock(&terminateMutex);
break;
}
pthread_mutex_unlock(&terminateMutex);
fstream log;
string msg="SEND NAME PLEASE\n";
if(send(clientfd,msg.c_str(),msg.length(),0) == -1){ //send request for name to client
perror("send");
close(clientfd);
break;
}
char nameBuffer[200];
char voteBuffer[200];
int bytesread;
bytesread = read(clientfd, nameBuffer, sizeof(nameBuffer));
nameBuffer[strcspn(nameBuffer, "\n")] = '\0';
nameBuffer[strcspn(nameBuffer, "\r")] = '\0';
pthread_mutex_lock(&vmtx);//lock vector mutex as thread worker is using the vector
if (find(voter_names.begin(), voter_names.end(), string(nameBuffer)) != voter_names.end()){//check if name already exists in the vector
pthread_mutex_unlock(&vmtx);
string msg1="ALREADY VOTED\n";
if(send(clientfd,msg1.c_str(),msg1.length(),0) == -1){ //send response to client
perror("send");
}
close(clientfd);//close client fd
break;
}
else{
pthread_mutex_unlock(&vmtx);//unlock vector mutex
string msg2="SEND VOTE PLEASE\n";
if(send(clientfd,msg2.c_str(),msg2.length(),0) == -1){ //send request for vote to client
perror("send");
close(clientfd);
break;
}
if(read(clientfd,voteBuffer,200) == -1){ //read vote
perror("read");
close(clientfd);
break;
}
voteBuffer[strcspn(voteBuffer, "\n")] = '\0';
voteBuffer[strcspn(voteBuffer, "\r")] = '\0';
string name(nameBuffer);
string vote(voteBuffer);
pthread_mutex_lock(&pmtx);//lock party vector mutex
party_votes[vote]++;//add vote to the party
pthread_mutex_unlock(&pmtx);//unlock mutex
pthread_mutex_lock(&vmtx);//lock vector mutex
voter_names.push_back(name);//push in vector the name of the voter
pthread_mutex_unlock(&vmtx);//unlock vector mutex,worker dont use vector anymore
}
voteBuffer[strcspn(voteBuffer, "\n")] = '\0';
voteBuffer[strcspn(voteBuffer, "\r")] = '\0';
string name(nameBuffer);
string vote(voteBuffer);
string msg3="VOTE for Party "+vote+" RECORDED\n";
if(send(clientfd,msg3.c_str(),msg3.length(),0) == -1){ //send response to client
perror("send");
close(clientfd);
break;
}
pthread_mutex_lock(&fmtx);//lock the mutex because worker thread will write
log.open(txt1.c_str(),ios::app);
//open file for appending so wont overwrite it
log << name << " " << vote << endl; //write to poll log the results from the vote,name and vote*/
log.close();
pthread_mutex_unlock(&fmtx);//unlock file mutex now
close(clientfd);
}
return NULL;
}
void accept_connection(int csock,int lsock,int buffersize)
{
if ((csock = accept(lsock, NULL, NULL)) < 0)
perror(" accept ");
pthread_mutex_lock(&bmtx); // lock mutex ,now buffer is used by master thread
while (buffer.size() == buffersize)
{ // wait until buffer is not full
pthread_cond_wait(&buffer_nonfull, &bmtx);
}
buffer.push(csock); // push in front of the buffer queue the client socket (fd)
pthread_cond_signal(&buffer_nonempty); // give a signal to worker for the new connection
pthread_mutex_unlock(&bmtx); // unlock buffer mutex
//check if signal was given
pthread_mutex_lock(&terminateMutex);
if (terminateServer) {
pthread_mutex_unlock(&terminateMutex);
return;
}
pthread_mutex_unlock(&terminateMutex);
}
void join_threads(int numWorkerthreads,pthread_t * workers_threads){
for(int i=0;i<numWorkerthreads;i++){
pthread_join(workers_threads[i],NULL);//wait for all worker threads to finish
}
delete[] workers_threads;
}