Try to implement a naive thread pool used by message queue

The problem
This is a simple demo practice which targets to use a naive thread pool by a message queue server. The motives come from the source code I happened to see in CISCO which is a blocking msg queue, which means
the server depends purely on its all message handler to be a quick one. Otherwise one single handler can block the whole message queue loop.
To run the demo test, run server first by giving argument of "server". Then run client by giving argument of "client". Then client prompt to input a file for server to display in terminal. Try to pickup a big text file.
However, I haven't really verified the performance of my algorithm as it is a simple practice.
1. open a message queue requires a global unique key id. By enumerating the argument "ch" in ftok, we can find a suitable one. Since client and server share the code, this will be the same one. A better alternative maybe simply close the message queue before opening it. This is believed to be the common way.
2. passing parameter bothers me a while as I have developed some nasty habit like java programmer who assumes all variables are dynamic and memory allocated when they are so casual in declaring countless temporary variables, in hope run time java compiler to optimize the code. 

#include <iostream>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>


using namespace std;

extern int errno;

typedef void* (*ThreadProc)(void*);

#define stricmp strcasecmp

struct ThreadStruct
{
    ThreadStruct():func(NULL), arg(NULL), argSize(0), bResult(NULL)
    {
    }

    ThreadProc func;
    void* arg;
    int argSize;
    bool* bResult;
};

void* myThreadProc(void* pArg)
{
    ThreadStruct* pThreadStruct = (ThreadStruct*) pArg;
    (*pThreadStruct->func)(pThreadStruct->arg);
    *pThreadStruct->bResult = true;
    return NULL;
}

class ThreadPool
{
public:
    ThreadPool(int nMax = 20): m_nThreads(nMax)
    {
        pThreads = new pthread_t[m_nThreads];
        bAvailable = new bool[m_nThreads];
        pThreadStruct = new ThreadStruct[m_nThreads];
        for (int i = 0; i < m_nThreads; i ++)
        {
            bAvailable[i] = true;
        }
    }

    bool insertJob(ThreadProc proc, void* arg, int argSize)
    {
        int i;
        for (i = 0; i < m_nThreads; i ++)
        {
            if (bAvailable[i])
            {
                bAvailable[i] = false;
                break;
            }
        }
        if (i >= m_nThreads)
        {
            return false;
        }
        pThreadStruct[i].func = proc;
        if (pThreadStruct[i].argSize < argSize)
        {
            delete [] pThreadStruct[i].arg;
            pThreadStruct[i].arg = new char[argSize];
        }
        pThreadStruct[i].argSize = argSize;
        memcpy(pThreadStruct[i].arg, arg, argSize);

        pThreadStruct[i].bResult = bAvailable + i;

        if (pthread_create(pThreads + i, NULL, myThreadProc, pThreadStruct + i) != 0)
        {
            // reset to be available.
            bAvailable[i] = true;
            return false;
        }
        return true;
    }

    virtual ~ThreadPool()
    {
        delete []pThreads;
        delete []bAvailable;
        for (int i = 0; i < m_nThreads; i ++)
        {
            delete [] pThreadStruct[i].arg;
        }
        delete []pThreadStruct;
    }
private:
    pthread_t* pThreads;
    bool *bAvailable;
    ThreadStruct* pThreadStruct;
    int m_nThreads;
};

struct MyMsgBuf
{
    long msgType;
    char msg[256];
};

void handleErrno()
{
    switch (errno)
    {
    case EACCES:
        cout<< "(permission denied)" << endl;
        break;
    case EEXIST:
        cout << "(Queue exists, cannot create)"<<endl;
        break;
    case EIDRM:
        cout << "(Queue is marked for deletion)"<<endl;
        break;
    case ENOENT:
        cout <<"(Queue does not exist)"<<endl;
        break;
    case ENOMEM:
        cout<<"(Not enough memory to create queue)"<<endl;
        break;
    case ENOSPC:
        cout<<"(Maximum queue limit exceeded)"<<endl;
        break;
    default:
        cout<<"error unknown"<<endl;
        break;
    }
}

const int MyMsgBufSize = sizeof(MyMsgBuf) - sizeof(long);

class MyMsgQ
{
public:
    MyMsgQ(char ch = 'N'):m_key(-1), m_queueId(-1), bAvailable(false)
    {
        do
        {
            if ((m_key = ftok(__FILE__, ch))!= -1)
            {
                if ((m_queueId = msgget(m_key, IPC_CREAT|0200|0400))!= -1)
                {
                    bAvailable = true;
                }
                else
                {
                    ch ++;
                    handleErrno();
                }
            }
        }
        while (!bAvailable);
    }

    bool sendMsg(char* msg, int size, int nPriority = 128)
    {
        if (bAvailable && size < MyMsgBufSize)
        {
            MyMsgBuf msgBuf;
            msgBuf.msgType = nPriority;
            memcpy(msgBuf.msg, msg, size);
            if (msgsnd(m_queueId, &msgBuf, size, NULL) != -1)
            {
                return true;
            }
            else
            {
                handleErrno();
            }
        }
        return false;
    }
    bool recvMsg(char* msg, int& size, int nPriority = 128)
    {
        if (bAvailable)
        {
            MyMsgBuf msgBuf;
            int sizeRecv;

            if ((sizeRecv = msgrcv(m_queueId, &msgBuf, MyMsgBufSize, nPriority, 0)) == -1)
            {
                handleErrno();
                return false;
            }
            if (sizeRecv <= size)
            {
                memcpy(msg, msgBuf.msg, sizeRecv);
                return true;
            }
        }
        return false;
    }
private:
    key_t m_key;
    int m_queueId;
    bool bAvailable;
};

void runClient()
{
    int counter = 0;
    MyMsgQ myMsgQ;
    while (true)
    {
        char buffer[256];
        cout << "client msg number :"<<counter ++ <<endl;
        cout<<"please input a valid file name:"<<endl;
        cin >> buffer;
        if (strcmp(buffer, "quit") == 0)
        {
            break;
        }
        if (myMsgQ.sendMsg(buffer, strlen(buffer)+1))
        {
            cout<<"successfully sent msg "<< buffer<<endl;
        }
    }
}

void* handleFile(void* arg)
{
    char* fileName = (char*) arg;
    FILE* stream = fopen(fileName, "r");
    if (stream)
    {
        char buffer[256];
        while (!feof(stream))
        {
            int size = fread(buffer, 1, sizeof(buffer) - 1, stream);
            buffer[size] = '\0';
            cout << buffer;
        }
        fclose(stream);
    }
    return NULL;
}


void runServer()
{
    MyMsgQ myMsgQ;
    ThreadPool threadPool;
    while (true)
    {
        char buffer[256];
        int size = sizeof(buffer) - 1;
        if (myMsgQ.recvMsg(buffer, size))
        {
            buffer[size] = '\0';
            cout<<"successfully recv msg "<< buffer<<endl;
            if (threadPool.insertJob(&handleFile, buffer, strlen(buffer) + 1))
            {
                cout << "successfully assign job to thread"<<endl;
            }
        }
    }
}


void runHelp(char* program)
{
    cout<<"usage: "<<program<<" [client|server]" << endl;
}


using namespace std;

int main(int argc, char** argv)
{
    if (argc == 2)
    {
        if (stricmp(argv[1], "client") == 0)
        {
            runClient();
        }
        else
        {
            if (stricmp(argv[1], "server") == 0)
            {
                runServer();
            }
            else
            {
                runHelp(argv[0]);
            }
        }
    }
    else
    {
        runHelp(argv[0]);
    }

    return 0;
}

กก







                                 back.gif (341 bytes)       up.gif (335 bytes)         next.gif (337 bytes)