User Tools

Site Tools


No-blocked stream reader

I wrote it only as rapid illustration for method non-blocked reading from blocked stream.

thr-recv.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
 
#include <pthread.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/time.h>
 
#include <err.h>
 
 
void msleep(int dur) {
    usleep(dur * 1000);
}
 
long int timestamp() {
    struct timeval tp;
    gettimeofday(&tp, NULL);
    return tp.tv_sec * 1000 + tp.tv_usec / 1000;
}
 
typedef struct stream {
    pthread_cond_t readcond;
    pthread_mutex_t readmutex;
 
    pthread_t* reader_thr;
    pthread_t* watcher_thr;
 
    int sockfd;
    char* buffer;
    int bufcap;
    int readsize;
    long int lastread;
    long int timeout;
    int done;
} stream_t;
 
void stream_sighandler(int signo) {
    fprintf(stderr, "have signal, thread id %d, exit\n", (int)pthread_self());
    pthread_exit(NULL);
}
 
void* stream_reader(void* argv) {
 
    struct sigaction act;
    memset(&act, 0, sizeof(act));
 
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGRTMIN);
 
    act.sa_handler = stream_sighandler;
    act.sa_mask = mask;
    sigaction(SIGRTMIN, &act, NULL);
 
    stream_t* s = (stream_t*)argv;
    s->lastread = timestamp();
    printf("reader start\n");
    sleep(3);
 
    strcpy(s->buffer, "hello!");
    s->readsize = strlen(s->buffer);
 
    long int dur = timestamp() - s->lastread;
    s->done = 1;
    pthread_mutex_unlock(&(s->readmutex));
    pthread_cond_signal(&(s->readcond));
 
    printf("reader done after %ld us\n", dur);
 
    return NULL;
}
 
 
void* stream_watcher(void* argv) {
    stream_t* s = (stream_t*)argv;
    printf("watcher start\n");
    while(s->done != 1) {
        printf("watch...\n");
        long dur = (timestamp() - s->lastread);
        if (dur > s->timeout) {
            printf("read timeout!\n");
            pthread_kill(*(s->reader_thr), SIGRTMIN);
            s->done = 1;
            pthread_mutex_unlock(&(s->readmutex));
            pthread_cond_signal(&(s->readcond));
            break;
        }
        msleep(500);
    }
    printf("watcher done\n");
    return NULL;
}
 
 
stream_t* stream_create(int timeout) {
    stream_t* s = (stream_t*)malloc(sizeof(stream_t));
 
    s->bufcap = 1024;
    s->buffer = (char*)malloc(s->bufcap);
    memset(s->buffer, 0, s->bufcap);
 
    s->reader_thr = (pthread_t*)malloc(sizeof(pthread_t));
    memset(s->reader_thr, 0, sizeof(pthread_t));
 
    s->watcher_thr = (pthread_t*)malloc(sizeof(pthread_t));
    memset(s->watcher_thr, 0, sizeof(pthread_t));
 
    s->sockfd = 0;
    s->readsize = 0;
    s->done = 0;
    s->timeout = timeout;
    return s;
}
 
void stream_close(stream_t* s) {
    free(s->buffer);
    free(s->reader_thr);
    free(s->watcher_thr);
    free(s);
}
 
int stream_recv(stream_t* s) {
    printf("read thread start\n");
 
    pthread_mutex_lock(&(s->readmutex));
 
    pthread_create(s->reader_thr, NULL, stream_reader, (void*)s);
    pthread_create(s->watcher_thr, NULL, stream_watcher, (void*)s);
 
    pthread_detach(*(s->reader_thr));
    pthread_detach(*(s->watcher_thr));
 
    while(s->done != 1) {
        pthread_cond_wait(&(s->readcond), &(s->readmutex));
        printf("while...\n");
    }
    printf("read thread done\n");
    return s->readsize;
}
 
int main(int argc, char **argv) {
    stream_t* s = NULL;
    int timeout = 0;
    int rv = 0;
    timeout = 2000;
    printf("create stream with timeout = %d ms\n", timeout);
 
    s = stream_create(timeout);
    if ((rv = stream_recv(s)) > 0) {
        char str[] = "";
        strncat(str, s->buffer, rv);
        printf("stream buffer: %s\n", str);
    } else {
        printf("stream buffer empty =(\n");
    }
    stream_close(s);
 
    timeout = 5000;
    printf("create stream with timeout = %d ms\n", timeout);
    s = stream_create(timeout);
    if ((rv = stream_recv(s)) > 0) {
        char str[] = "";
        strncat(str, s->buffer, rv);
        printf("stream buffer: %s\n", str);
    } else {
        printf("stream buffer empty =(\n");
    }
    stream_close(s);
 
    return 0;
}

Out

$ ./thr-reader
create stream with timeout = 2000 ms
read thread start
watcher start
watch...
reader start
watch...
watch...
watch...
watch...
read timeout!
watcher done
while...
read thread done
stream buffer empty =(

create stream with timeout = 5000 ms
read thread start
have signal, thread id 27354368, exit
reader start
watcher start
watch...
watch...
watch...
watch...
watch...
watch...
reader done after 3006 us
while...
read thread done
stream buffer: hello!
$