User Tools

Site Tools


RPC/IPC prototype/sample

Client

zcnt.c

zcnt.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
 
#include <pthread.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netdb.h>
 
#include "socket.h"
#include "stream.h"
#include "message.h"
 
void msleep(int dur) {
    usleep(dur * 1000);
}
 
int main(int argc, char **argv) {
 
    int conn = connector_create("localhost", 1024);
    if (conn < 0) { return 0; }
 
    stream_t* s = stream_create(conn);
 
    /* initialize commucation */
    /* send hello */
    int rv;
    msg_send(s, NULL, MSG_TYPE_HELLO, MSG_HELLO_ID, 0);
    /* receive hello */
    int type = 0;
    int id = 0;
    rv = msg_recv(s, NULL, &type, &id);
    if (type != MSG_TYPE_HELLO) {
        stream_close(s);
        stream_free(s);
        return 1;
    }
 
    /* send request */
    char req[] = "{\"method\": \"hello\"}";
    int req_id = 1;
    printf("request=%s\n", req);
    msg_send(s, req, MSG_TYPE_JSON, req_id, strlen(req));
 
    /* receive response */
    char* res = NULL;
    int res_id = 0;
    rv = msg_recv(s, &res, &type, &res_id);
    /* check response */
    if (type != MSG_TYPE_JSON || rv == 0) {
        if (res != NULL) { free(res); }
        stream_close(s);
        stream_free(s);
        return 1;
    }
    printf("response=%s\n", res);
    if (res != NULL) { free(res); }
 
    /* finalize communication */
    stream_close(s);
    stream_free(s);
 
    return 0;
}

Server

zsrv.c

zsrv.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
 
#include <pthread.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netdb.h>
 
#include "socket.h"
#include "stream.h"
#include "message.h"
 
 
int main(int argc, char **argv) {
 
    int sock = listener_create(1024, 512);
    int newsock;
    while(1) {
        newsock = accept(sock, NULL, 0);
        stream_t* s = stream_create(newsock);
 
        /* initialize commucation */
        /* send hello */
        int rv;
        msg_send(s, NULL, MSG_TYPE_HELLO, MSG_HELLO_ID, 0);
        /* receive hello */
        int type = 0;
        int id = 0;
        rv = msg_recv(s, NULL, &type, &id);
        printf("receive hello\n");
 
        /* check hello */
        if (type != MSG_TYPE_HELLO) {
            stream_close(s);
            stream_free(s);
            continue;
        }
 
        /* receive request */
        char* req = NULL;
        int req_id = 0;
        int req_type = 0;
        rv = msg_recv(s, &req, &req_type, &req_id);
        printf("receive request size=%d type=%d id=%d\n", rv, req_type, req_id);
 
        /* check request */
        if (rv == 0 || req_type != MSG_TYPE_JSON) {
            if (req != NULL) { free(req); }
            continue;
        }
        printf("request=%s\n", req);
        if (req != NULL) { free(req); }
 
        /* send response */
        char res[] = "{\"response\": \"hello\"}";
        int res_id = req_id;
        printf("response=%s\n", req);
        msg_send(s, res, MSG_TYPE_JSON, res_id, strlen(res));
 
        //msg_handle_req(s);
        stream_close(s);
        stream_free(s);
    }
 
    close(sock);
    return 0;
}

Library

message.h

message.h
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#define MSG_VER1            ((uint16_t)0x1)
 
#define MSG_TYPE_HELLO      ((uint16_t)1)
#define MSG_TYPE_BIN        ((uint16_t)2)
#define MSG_TYPE_JSON       ((uint16_t)3)
#define MSG_TYPE_JSONREQ    ((uint16_t)4)
#define MSG_TYPE_JSONRES    ((uint16_t)5)
 
#define MSG_HELLO_ID        ((uint32_t)0x12345)
 
 
 
typedef struct msg_hdr {
    uint16_t ver;
    uint32_t id;
    uint16_t type;
    uint64_t size;
} msg_hdr_t;
 
 
typedef struct msg {
    msg_hdr_t header;
    char* buffer;
} msg_t;
 
msg_hdr_t* msg_hdr_create(uint16_t type, uint32_t id, size_t size) {
    msg_hdr_t* h = (msg_hdr_t*)malloc(sizeof(msg_hdr_t));
    if (h == NULL) return NULL;
    memset(h, 0, sizeof(msg_hdr_t));
    h->ver = MSG_VER1;
    h->type = type;
    h->id = id;
    h->size = size;
    return h;
}
 
char* msg_hdr_pack(msg_hdr_t* h) {
    char* pack = (char*)malloc(sizeof(msg_hdr_t));
    if (pack == NULL) return NULL;
    memset(pack, 0, sizeof(msg_hdr_t));
 
    size_t shift = 0;
 
    memcpy(&(pack[shift]), &(h->ver), sizeof(h->ver));
    shift += sizeof(h->ver);
 
    memcpy(&(pack[shift]), &(h->type), sizeof(h->type));
    shift += sizeof(h->type);
 
    memcpy(&(pack[shift]), &(h->size), sizeof(h->size));
    shift += sizeof(h->size);
 
    memcpy(&(pack[shift]), &(h->id), sizeof(h->id));
    shift += sizeof(h->id);
 
    return pack;
}
 
msg_hdr_t* msg_hdr_unpack(char* pack) {
    msg_hdr_t* h = (msg_hdr_t*)malloc(sizeof(msg_hdr_t));
    if (h == NULL) return NULL;
    memset(h, 0, sizeof(msg_hdr_t));
    size_t shift = 0;
 
    memcpy(&(h->ver), &(pack[shift]), sizeof(h->ver));
    shift += sizeof(h->ver);
 
    memcpy(&(h->type), &(pack[shift]), sizeof(h->type));
    shift += sizeof(h->type);
 
    memcpy(&(h->size), &(pack[shift]), sizeof(h->size));
    shift += sizeof(h->size);
 
    memcpy(&(h->id), &(pack[shift]), sizeof(h->id));
    shift += sizeof(h->id);
 
    return h;
}
 
size_t msg_hdr_size() {
    msg_hdr_t* h;
    size_t size = sizeof(h->type) + sizeof(h->ver) + sizeof(h->size) + sizeof(h->id);
    return size;
}
 
void msg_hdr_free(msg_hdr_t* h) {
    if (h != NULL) free(h);
}
 
char* hdrbuf_create() {
    char* buf = (char*)malloc(sizeof(msg_hdr_t));
    if (buf == NULL) return NULL;
    memset(buf, 0, sizeof(msg_hdr_t));
    return buf;
}
 
void hdrbuf_free(char* buf) {
    if (buf == NULL) return;
    free(buf);
}
 
int msg_send(stream_t* s, char* data, int type, int id, size_t size) {
    /* send request */
    msg_hdr_t* hdr = msg_hdr_create(type, id, size);
    char* hdrbuf = msg_hdr_pack(hdr);
 
    int rv = 0;
    rv = stream_write(s, hdrbuf, msg_hdr_size());
    rv = stream_write(s, data, size);
 
    hdrbuf_free(hdrbuf);
    msg_hdr_free(hdr);
    return rv;
}
 
int msg_recv(stream_t* s, char** data, int* type, int* id) {
    /* receive header */
    char* hdrbuf = hdrbuf_create();
    int rv = 0;
    rv = stream_read(s, hdrbuf, msg_hdr_size());
    if (rv < msg_hdr_size()) {
        hdrbuf_free(hdrbuf);
        return -1;
    }
    /* unpack header */
    msg_hdr_t* hdr = msg_hdr_unpack(hdrbuf);
    hdrbuf_free(hdrbuf);
    if (id != NULL ) { *id = hdr->id; }
    if (type != NULL) { *type = hdr->type; }
    //printf("receive msg type=%d size=%ld\n", hdr->type, hdr->size);
 
    if (hdr->size == 0) {
        return 0;
    }
    /* receive data */
    *data = (char*)malloc(hdr->size + 1);
    if (*data == NULL) { return -1; }
    memset(*data, 0, hdr->size + 1);
 
    rv = stream_read(s, (char*)*data, hdr->size);
    //printf("receive msg data size=%d\n", rv);
 
    if (rv < hdr->size) {
        msg_hdr_free(hdr);
        if (*data != NULL) { free(*data); }
        return -1;
    }
    msg_hdr_free(hdr);
    return rv;
}

socket.h

socket.h
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
int listener_create(int port, int backlog) {
    int sock;
    if((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
        fprintf(stderr, "cannot create socket, exit\n");
        exit(1);
    }
    int optval;
    optval = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        fprintf(stderr, "cannot set socket option, exit\n");
        exit(1);
    }
 
    struct timeval tv;
    tv.tv_sec = 60;
    tv.tv_usec = 0;
    if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
        return -1;
    }
 
    struct sockaddr addr;
    struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
    paddr->sin_family = AF_INET;
    paddr->sin_addr.s_addr = INADDR_ANY;
    paddr->sin_port = htons(port);
    paddr->sin_len = sizeof(struct sockaddr_in);
 
    if (bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
        fprintf(stderr, "cannot bind socket, exit\n");
        exit(1);
    }
    if (listen(sock, backlog) < 0) {
        fprintf(stderr, "cannot listen socket, exit\n");
        exit(1);
    }
    return sock;
}
 
int connector_create(char* hostname, int port) {
    int sock;
    if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
        return -1;
    }
 
    struct addrinfo hints;
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
 
    struct addrinfo* result = NULL;
 
    int retval;
    if ((retval = getaddrinfo(hostname, NULL, &hints, &result)) != 0) {
        freeaddrinfo(result);
        return -1;
    }
 
    struct sockaddr_in* sockaddr = NULL;
    for (struct addrinfo* ptr = result; ptr != NULL; ptr = ptr->ai_next) {
        switch (ptr->ai_family) {
            case AF_INET:
                sockaddr = (struct sockaddr_in*)ptr->ai_addr;
                break;
            default:
                break;
        }
        if (sockaddr != NULL) {
            break;
        }
    }
 
    struct sockaddr addr;
    memset(&addr, 0, sizeof(struct sockaddr));
    memcpy(&addr, sockaddr, sizeof(struct sockaddr_in));
    freeaddrinfo(result);
 
    struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
    paddr->sin_family = AF_INET;
    paddr->sin_port = htons(port);
    paddr->sin_len = sizeof(struct sockaddr_in);
 
    struct timeval tv;
    tv.tv_sec = 60;
    tv.tv_usec = 0;
    if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
        return -1;
    }
 
    if (connect(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
        return -1;
    }
    return sock;
}

stream.h

stream.h
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
typedef struct stream {
    int fd;
} stream_t;
 
stream_t* stream_create(int fd) {
    stream_t* s = (stream_t*)malloc(sizeof(stream_t));
    s->fd = fd;
    return s;
}
 
int stream_read(stream_t* s, char* buffer, size_t size) {
    return read(s->fd, buffer, size);
}
 
int stream_write(stream_t* s, char* buffer, size_t size) {
    return write(s->fd, buffer, size);
}
 
void stream_close(stream_t* s) {
    close(s->fd);
}
 
void stream_free(stream_t* s) {
    if (s != NULL) free(s);
}

Out

$ ./zcnt 
request={"method": "hello"}
response={"response": "hello"}
$ ./zsrv 
receive hello
receive request size=19 type=3 id=1
request={"method": "hello"}
response={"method": "hello"}