SOL9 2.0 Class: SocketStream

 SOL9 C++ Class Library  SOL9 Samples  SOL9 Tutorial  SOL9 FAQ  SOL9 ClassTree  SOL9 ClassList 

Source code

/******************************************************************************
 *
 * Copyright (c) 1999-2008 Antillia.com TOSHIYUKI ARAI. ALL RIGHTS RESERVED.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions, and the following disclaimer.
 *  
 * 2. The name of the author may not be used to endorse or promote products
 *    derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 *
 *  SocketStream.h
 *
 *****************************************************************************/

// SOL++2000
// 1999.08.10 Modified
#pragma once

#include <sol\Socket.h>
#include <sol/StringBufferT.h>
//#include <sol\StringBuffer.h>

#include <sol\InetAddress.h>
#include <sol/StringT.h>
#include <sol/StringBufferT.h>

/**
 * SocketStream class
 */
namespace SOL {

class SocketStream :public Socket {
  char  line[1024];
  int    startPos;
  int    contentSize;

  char*  buffer;

  static const int BUFFER_SIZE = 1024*8;

public:
  /**
   *
   */
  SocketStream() 
    :Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP),
    startPos(0),
    contentSize(0),
    buffer(new char[BUFFER_SIZE])
  {
  }
public:
  /**
   *
   */
  SocketStream(int domain, int protocol) 
    :Socket(domain, SOCK_STREAM, protocol),
    startPos(0),
    contentSize(0),
    buffer(new char[BUFFER_SIZE])
  {
  }

public:
  /**
   *
   */
  SocketStream(SOCKET soc) 
    :Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP),
    startPos(0),
    contentSize(0),
    buffer(new char[BUFFER_SIZE])
  {
    setSocket(soc);
  }

public:
  /**
   *
   */
  ~SocketStream() 
  {
    if (this->buffer) {
      delete [] this->buffer; //1999.08.22
      this->buffer = NULL;
    }
  }


public:
  /**
   *
   */
  SocketStream* accept(sockaddr* addr, int* size)
  {
    BOOL rc = FALSE;
    SOCKET soc = getSocket();
    SocketStream* stream = NULL;

    SOCKET newfd = ::accept(soc, addr, size);
        
    if(newfd != INVALID_SOCKET) {
      stream = new SocketStream(newfd);
    }

    if(stream == NULL) {
      throw InvalidSocketException("SocketStream::accept,1,Failed to accept a socket", 
        WSAGetLastError());  
    }

    return stream;
  }

public:
  /**
   *
   */
  SocketStream* accept(InetAddress& address) 
  {
    BOOL rc = FALSE;
    SOCKET soc = getSocket();
    SocketStream* stream = NULL;
    
    sockaddr_in* addr = address.getAddress();
    int size  = address.getSize();
        
    SOCKET newfd = ::accept(soc, (sockaddr*)addr, &size);
        
    if(newfd != INVALID_SOCKET) {
      stream = new SocketStream(newfd);
    }

    if(stream == NULL) {
      throw InvalidSocketException("StreamSocket,1,accept,Failed to accept a socket", 
        WSAGetLastError());  
    }

    return stream;
  }


public:
  /**
   *
   */
  SocketStream* accept(unsigned short port, unsigned long address) 
  {
    BOOL   rc = FALSE;
    SOCKET soc = getSocket();
    SocketStream* stream = NULL;

    sockaddr_in client;
    memset(&client, 0, sizeof(client));
    client.sin_family = getDomain();
    client.sin_port = htons(port);
    client.sin_addr.s_addr = htonl(address);
    int size = sizeof(client);
    SOCKET newfd = ::accept(soc, (sockaddr*)&client, &size);
        
    if(newfd != INVALID_SOCKET) {
      stream = new SocketStream(newfd);
    }
    
    if(stream == NULL) {
      throw InvalidSocketException("SocketStream::accpet,1,Failed to accept a socket", 
        WSAGetLastError());  
    }
    return stream;
  }


public:
  /**
   *
   */
  SOCKET create(int domain=AF_INET, int protocol=IPPROTO_TCP) 
  {
    return Socket::create(domain, SOCK_STREAM, protocol);
  }


public:
  /**
   *
   */
  int connect(unsigned short port, hostent* hostEntry) 
  {
    SOCKET soc = getSocket();

    sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = getDomain();
    server.sin_port   = htons(port);
    memcpy((char*)&server.sin_addr, 
                hostEntry->h_addr, hostEntry->h_length);
    return ::connect(soc, (sockaddr*)&server, sizeof(server));
    
  }

public:
  /**
   *
   */
  int connect(unsigned short port, unsigned long address) 
  {
    SOCKET soc  = getSocket();

    sockaddr_in server;
    memset(&server, 0, sizeof(server));

    server.sin_family = getDomain();
    server.sin_port   = htons(port);
    server.sin_addr.s_addr = htonl(address);
    return ::connect(soc, (sockaddr*)&server, sizeof(server));
    }


public:
  /**
   *
   */
  int connect(sockaddr* addr, size_t size)  
  {
    SOCKET soc  = getSocket();
    return ::connect(soc, addr, size);
  }


// 1999.07.03
public:
  /**
   *
   */
  int connect(InetAddress& addr)
  {
    SOCKET soc  = getSocket();
    return ::connect(soc, (sockaddr*)addr.getAddress(), addr.getSize());
  }

public:
  /**
   *
   */
  int readLine(char* buffer, int max)
  {
    bool crFound = false;

    char ch[1];
    int len = 0;
    int i = 0;
  
    int RETRY_COUNT_MAX = 10;
    int retryCount = 0;

    SOCKET soc = getSocket();

    while (i< (max-1)) {
      if (isReadable(soc)==false) {
        continue;
      }
      //This is a very slow method. 
      len = SocketStream::recv(ch, sizeof(ch), 0);

      if (len == SOCKET_ERROR && WSAGetLastError() ==WSAEWOULDBLOCK) {
        //Printf("Socket#readLine,1,SOCKET_ERRO,WSAEWOULDBLOCK\r\n");

        if (retryCount < RETRY_COUNT_MAX) {
          //Do retry to read the socket.
          Sleep(10);
          retryCount++;
          continue;
        } else {
          //Printf("Socket#readLine,retry over\r\n");
          break;
        }
      }

      if (len <=0) {
        break;
      }

      buffer[i++] = ch[0];
      
      if (ch[0] == CHAR_CR) {
        crFound = true;
        continue;
      }

      if (crFound && ch[0]== CHAR_LF) {
        break;
      }

      retryCount = 0;
    }

    buffer[i]= '\0';
    //Printf("SocketStream#readLine,2,lenghth=%d line=%s \r\n", i, buffer);

    return i;
  }

public:
  int readLine(StringBufferT<char>& buffer)
  {
    bool crFound = false;
    char ch[1];
    int len = 0;

    SOCKET soc = getSocket();

    while (true) {
      //2008/07/11
      if (isReadable(soc)==false) {
        continue;
      }

      len = recv(ch, 1, 0);

      if (len <= 0) {
        break;
      }
      
      //2008/07/12

      //If ch is 0x0d, then set crFount=true and continue this reading loop,
      //Modified not to append this 0x0d to buffer. 
      if (ch[0] == 0xd) {
        crFound = true;
        continue;
      }
      //Found a newline(0x0d,0x0a), break this loop.
      if (crFound && ch[0]== 0xa) {
        break;
      }
      //Other cases, append ch to buffer
      buffer.append(ch[0]);

    }
    return buffer.getContentSize();
  }


public:
  int getLine(StringBuffer& buffer)
  {
    Boolean found = False;
    for(int i = startPos; i<contentSize; i++) {
      //If found a newline(0x0d, 0x0a) in line, then set startPos, and break 

      if (line[i] == 0xd || line[i] == 0xa) {
        startPos = i+1;
        found = True;
        //2008/07/11
        break;
      }
      //else, then apppend a character to the buffer 

      else {
        buffer.append(line[i]);
      }
    }

    //Found a newline(0x0d, 0x0a), then return 
    if (found) {
      return buffer.getContentSize();
    }

    SOCKET soc = getSocket();

    while (1) {
      startPos = 0;
      //2008/07/11
      if (isReadable(soc)==false) {
        continue;
      }

      contentSize = SocketStream::recv(line, 1024, 0);
      if (contentSize <=0) {
        // Reached to end of data.
        return  -1;
      }
      for(int i = startPos; i<contentSize; i++) {
        if (line[i] == 0xd || line[i] == 0xa) {
          startPos = i+1;
          found = True;
          break;
        } else {
          buffer.append(line[i]);
        }
      }

      if (found) {
        break;
      }
    }

    return buffer.getContentSize();
  }



public:
  int getPeerName(InetAddress& address)
  {
    sockaddr_in* addr = address.getAddress();
    return getPeerName(addr);
  }

public:
  int getPeerName(sockaddr_in* addr) 
  {
    memset(addr, 0, sizeof(sockaddr_in));
    SOCKET soc = getSocket();
    int addrlen = sizeof(sockaddr_in);
    return ::getpeername(soc, (sockaddr*)addr, &addrlen);
    
  }

public:

  int listen(int backlog) 
  {
    SOCKET soc = getSocket();
    
    return ::listen(soc, backlog);
  }

public:
  int select(HWND hwnd, unsigned int msg, long event)
  { 
    SOCKET soc = getSocket();
    return ::WSAAsyncSelect(soc, hwnd, msg, event);
  }

private:
  //
  bool isReadable(SOCKET fd, int timeout=100)
  {
    bool rc = false;

    fd_set  readFD, writeFD;
  
    timeval tv;
    memset(&tv, 0, sizeof(tv));
    tv.tv_usec = timeout;
    
    FD_ZERO(&readFD);
    FD_ZERO(&writeFD);
  
    FD_SET(fd, &readFD);
    FD_SET(fd, &writeFD);

    if (::select(FD_SETSIZE, &readFD, &writeFD, 
        NULL,  &tv) != SOCKET_ERROR) {
  
      if (FD_ISSET(fd, &readFD)) {
        //OK. Readable
        rc = true;
      }
    } else {
      //SOCKET_ERROR
      if (WSAGetLastError()== WSAEWOULDBLOCK) {
        Sleep(2);
      }
    }
    return rc;
  }

private:
  //
  bool isWritable(SOCKET fd, int timeout=100)
  {
    bool rc = false;
  
    fd_set  readFD, writeFD;
    timeval tv;
    memset(&tv, 0, sizeof(tv));
    tv.tv_usec = timeout;

    FD_ZERO(&readFD);
    FD_ZERO(&writeFD);

    FD_SET(fd, &readFD);
    FD_SET(fd, &writeFD);

    if (::select(FD_SETSIZE, &readFD, &writeFD, 
        NULL,  &tv) != SOCKET_ERROR) {

      if (FD_ISSET(fd, &writeFD)) {
        //OK. Writable
        rc = true;
      }
    } else {
      //SOCKET_ERROR
      if (WSAGetLastError() == WSAEWOULDBLOCK) {
        Sleep(2);
      }
    }
    return rc;
  }


// 1999.08.22 
public:
  int printf(const char* format,...)
  {
    va_list pos;
    va_start(pos, format);
    vsprintf_s(buffer, BUFFER_SIZE, format, pos);
    va_end(pos);

    return sendAll(buffer);
  }

public:
  int recv(char* buff, int len, int flag=0) 
  {
    SOCKET soc = getSocket();

    return ::recv(soc, buff, len, flag);
  }
    
public:
  int send(const char* buff, int len, int flag=0) 
  {
    SOCKET soc = getSocket();
    return ::send(soc, buff, len, flag);
  }

// 1999.06.12 to-arai
// 1999.08.16 Added a timeout argument. Specify the timeout in second.
public:
  int sendAll(const char* buff, int len, int flag=0, long timeout=30)
  {
    
    int sentBytes = 0;
    
    SOCKET soc = getSocket();

    const char* ptr = buff;
    int  orglen = len;
    time_t startTime = time(NULL);
    
    const int RETRY_COUNT_MAX = 10;
    int   retryCount = 0;

    while (len >0) {
      
      time_t currentTime = time(NULL);

      if ((currentTime - startTime) > timeout) {
        if (sentBytes < orglen) { 
          // timeout;
          break;
        }
        if (sentBytes == orglen) {
          break;
        }
      }

      //Check if fd is writable
      if (isWritable(soc) == false) {
        continue;
      }

      int size = ::send(soc, ptr, len, flag);
      
      if (size == SOCKET_ERROR && WSAGetLastError() ==WSAEWOULDBLOCK) {
        //Socket error has happened, and if it were caused by blocking,
        //retry to send the buff data.
        if (retryCount < RETRY_COUNT_MAX) {
          Sleep(10);
          retryCount++;
          continue;
        } else {
          break;
        }
      }

      if (size < 0 && len <=0) {  // 1999.08.14
        //Something send-error has happened!
        break;
      }
  
      if (size >0) {
        sentBytes += size;
        ptr += size;
        len -= size;
      }

      retryCount = 0;
    }
    return sentBytes;
  }

public:
  int sendAll(const char* string) 
  {
    int rc = 0;
    if (string) {
      rc = sendAll(string, strlen(string), 0);
    }
    return rc;
  }

public:    
  //2009/10/18 String -> StringT
  int sendAll(StringT<char>& string) 
  {
    const char* text = (const char*)string;
    int rc = 0;
    if (text) {
      rc = sendAll(text, strlen(text), 0);
    }
    return rc;
  }
  
public:
  int sendAll(StringBufferT<char>& buffer) 
  {
    const char* text = buffer.getBuffer();
    int rc = 0;
    if (text) {
      rc = sendAll(text, strlen(text), 0);
    }
    return rc;
  }

};

}


Last modified: 5 May 2019

Copyright (c) 2009-2019 Antillia.com ALL RIGHTS RESERVED.