/*
* Author, Copyright: Oleg Borodin <onborodin@gmail.com>
*/
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/event.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <err.h>
void msleep(int dur) {
usleep(dur * 1000);
}
typedef struct queue {
pthread_mutex_t mutex;
int* items;
int front;
int rear;
int size;
int capacity;
} queue_t;
queue_t* queue_create(int size) {
queue_t* q = (queue_t*)malloc(sizeof(queue_t));
q->items = (int*)malloc((size + 1) * sizeof(int));
q->capacity = size;
q->front = 0;
q->rear = -1;
q->size = 0;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&(q->mutex), NULL);
pthread_mutexattr_destroy(&attr);
return q;
}
int queue_size(queue_t* q) {
if (q == NULL) return -1;
pthread_mutex_lock(&(q->mutex));
int res = q->size;
pthread_mutex_unlock(&(q->mutex));
return res;
}
int queue_front(queue_t* q) {
if (q == NULL) return -1;
pthread_mutex_lock(&(q->mutex));
int res = -1;
if (q->size > 0) {
res = q->items[q->front];
q->front = (q->front + 1) % q->capacity;
q->size--;
}
pthread_mutex_unlock(&(q->mutex));
return res;
}
int queue_push(queue_t* q, int x) {
if (q == NULL) return -1;
pthread_mutex_lock(&q->mutex);
int res = -1;
if ((q->size + 1) > q->capacity) {
int newcapacity = q->capacity * 2;
void* items = realloc(q->items, sizeof(int) * newcapacity);
if (items != NULL) {
q->items = items;
q->capacity = newcapacity;
}
}
if ((q->size + 1) > q->capacity) {
pthread_mutex_unlock(&(q->mutex));
return -1;
}
q->rear = (q->rear + 1) % q->capacity;
q->items[q->rear] = x;
q->size++;
res = 1;
pthread_mutex_unlock(&(q->mutex));
return res;
}
void queue_lock(queue_t* q) {
pthread_mutex_lock(&(q->mutex));
}
void queue_unlock(queue_t* q) {
pthread_mutex_unlock(&(q->mutex));
}
void queue_free(queue_t* q) {
if (q == NULL) return;
free(q->items);
free(q);
}
typedef struct worker {
pthread_mutex_t mutex;
pthread_t* thread;
pthread_cond_t* cond;
queue_t* queue;
} worker_t;
worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
worker_t* w = (worker_t*)malloc(sizeof(worker_t));
w->queue = queue;
w->cond = cond;
w->thread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_mutex_init(&(w->mutex), NULL);
pthread_create(w->thread, NULL, handler, (void*)w);
pthread_detach(*(w->thread));
return w;
}
typedef struct wrkpool {
int size;
worker_t** workers;
queue_t* queue;
pthread_cond_t cond;
} wrkpool_t;
wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));
wp->queue = queue;
wp->workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
pthread_cond_init(&(wp->cond), NULL);
for (int i = 0; i < size; i++) {
wp->workers[i] = worker_create(queue, &(wp->cond), handler);
}
wp->size = size;
return wp;
}
int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
queue_push(wp->queue, newsock);
for (int i = 0; i < wp->size; i++) {
worker_t* w = wp->workers[i];
pthread_mutex_unlock(&(w->mutex));
}
pthread_cond_broadcast(&(wp->cond));
return 1;
}
void socket_rw(int sock) {
const int reqcap = 1024;
char reqbuf[reqcap] = "";
memset(reqbuf, 0, reqcap);
int rv = 0;
int trv = 0;
int rep = 0;
while(1) {
const int tmpsize = 1024 * 2;
char tmpbuf[tmpsize];
memset(tmpbuf, 0, tmpsize);
rv = read(sock, tmpbuf, tmpsize);
strncat(reqbuf, tmpbuf, rv);
trv += rv;
if (strnstr(reqbuf, "\r\n\r\n", trv) != NULL) break;
if (++rep > 4) {
close(sock);
return;
}
}
if (trv > 0) {
char content[] = "<html>\r\n"
"<head><title>Hello</title></head>\r\n"
"<body>\r\n"
"<center><h1>Hello, World!</h1></center>\r\n"
"<hr><center>Srv11/0.1</center>\r\n"
"</body>\r\n"
"</html>\r\n";
char header[] = "HTTP/1.1 200 OK\r\n"
"Accept-Ranges: bytes\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 142\r\n"
"Conection: close\r\n"
"Server: Srv11/0.1\r\n"
"Date: Wed, 16 May 2019 20:05:07 UTC\r\n"
"\r\n";
const int rescap = 1024;
char res[rescap] = "";
strcat(res, header);
strcat(res, content);
write(sock, res, strlen(res));
shutdown(sock, SHUT_RDWR);
}
close(sock);
}
void* handler(void* argp) {
worker_t* w = (worker_t*)argp;
queue_t* queue = w->queue;
while(true) {
pthread_mutex_lock(&(w->mutex));
while(queue_size(queue) == 0) {
pthread_cond_wait(w->cond, &(w->mutex));
}
queue_lock(queue);
if (queue_size(queue) == 0) {
continue;
queue_unlock(queue);
}
int sock = queue_front(queue);
queue_unlock(queue);
socket_rw(sock);
pthread_mutex_unlock(&(w->mutex));
}
return NULL;
}
int socket_create(int port, int backlog) {
int sock;
if((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "cannot create socket, exit\n");
exit(1);
}
int optval;
optval = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
fprintf(stderr, "cannot set socket option, exit\n");
exit(1);
}
struct sockaddr addr;
struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
paddr->sin_family = AF_INET;
paddr->sin_addr.s_addr = INADDR_ANY;
paddr->sin_port = htons(port);
paddr->sin_len = sizeof(struct sockaddr_in);
if (bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
fprintf(stderr, "cannot bind socket, exit\n");
exit(1);
}
if (listen(sock, backlog) < 0) {
fprintf(stderr, "cannot listen socket, exit\n");
exit(1);
}
return sock;
}
int main(int argc, char **argv) {
queue_t* queue = queue_create(1024 * 4);
wrkpool_t* wrkpool = wrkpool_create(50, queue, handler);
int sock = socket_create(1024, 4096);
while (1) {
int newsock = 0;
if ((newsock = accept(sock, NULL, 0)) > 3) {
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0;
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
fprintf(stderr, "cannot set socket option, exit\n");
exit(1);
}
if (wrkpool_enqueue(wrkpool, newsock) < 0) {
close(newsock);
}
}
}
close(sock);
return 0;
}