V148

Fork of RadioHead-148 by David Rimer

RH_TCP.cpp

Committer:
ilkaykozak
Date:
2017-10-25
Revision:
1:b7641da2b203
Parent:
0:ab4e012489ef

File content as of revision 1:b7641da2b203:

// RH_TCP.cpp
//
// Copyright (C) 2014 Mike McCauley
// $Id: RH_TCP.cpp,v 1.5 2015/08/13 02:45:47 mikem Exp $

#include <RadioHead.h>

// This can only build on Linux and compatible systems
#if (RH_PLATFORM == RH_PLATFORM_UNIX) 

#include <RH_TCP.h>
#include <sys/types.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <netdb.h>
#include <string>

RH_TCP::RH_TCP(const char* server)
    : _server(server),
      _rxBufLen(0),
      _rxBufValid(false),
      _socket(-1)
{
}
    
bool RH_TCP::init()
{   
    if (!connectToServer())
	return false;
    return sendThisAddress(_thisAddress);
}
    
bool RH_TCP::connectToServer()
{
    struct addrinfo hints;
    struct addrinfo *result, *rp;
    int sfd, s;
    struct sockaddr_storage peer_addr;
    socklen_t peer_addr_len;

    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family = AF_UNSPEC;    // Allow IPv4 or IPv6
    hints.ai_socktype = SOCK_STREAM; // Stream socket
    hints.ai_flags = AI_PASSIVE;    // For wildcard IP address 
    hints.ai_protocol = 0;          // Any protocol 
    hints.ai_canonname = NULL;
    hints.ai_addr = NULL;
    hints.ai_next = NULL;
    
    std::string server(_server);
    std::string port("4000");
    size_t indexOfSeparator = server.find_first_of(':');
    if (indexOfSeparator != std::string::npos)
    {
	port = server.substr(indexOfSeparator+1);
	server.erase(indexOfSeparator);
    }

    s = getaddrinfo(server.c_str(), port.c_str(), &hints, &result);
    if (s != 0) 
    {
	fprintf(stderr, "RH_TCP::connect getaddrinfo failed: %s\n", gai_strerror(s));
	return false;
    }

    // getaddrinfo() returns a list of address structures.
    // Try each address until we successfully connect(2).
    // If socket(2) (or connect(2)) fails, we (close the socket
    // and) try the next address. */

    for (rp = result; rp != NULL; rp = rp->ai_next) 
    {
	_socket = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
	if (_socket == -1)
	    continue;
	
	if (connect(_socket, rp->ai_addr, rp->ai_addrlen) == 0)
	    break;                  /* Success */

	close(_socket);
    }

    if (rp == NULL) 
    {               /* No address succeeded */
	fprintf(stderr, "RH_TCP::connect could not connect to %s\n", _server);
	return false;
    }

    freeaddrinfo(result);           /* No longer needed */

    // Now make the socket non-blocking
    int on = 1;
    int rc = ioctl(_socket, FIONBIO, (char *)&on);
    if (rc < 0)
    {
	fprintf(stderr,"RH_TCP::init failed to set socket non-blocking: %s\n", strerror(errno));
	close(_socket);
	_socket = -1;
	return false;
    }
    return true;
}

void RH_TCP::clearRxBuf()
{
    _rxBufValid = false;
    _rxBufLen = 0;
}

void RH_TCP::checkForEvents()
{
    #define RH_TCP_SOCKETBUF_LEN 500
    static uint8_t socketBuf[RH_TCP_SOCKETBUF_LEN]; // Room for several messages
    static uint16_t socketBufLen = 0;

    // Read at most the amount of space we have left in the buffer
    ssize_t count = read(_socket, socketBuf + socketBufLen, sizeof(socketBuf) - socketBufLen);
    if (count < 0)
    {
	if (errno != EAGAIN)
	{
	    fprintf(stderr,"RH_TCP::checkForEvents read error: %s\n", strerror(errno));
	    exit(1);
	}
    }
    else if (count == 0)
    {
	// End of file
	fprintf(stderr,"RH_TCP::checkForEvents unexpected end of file on read\n");
	exit(1);
    }
    else
    {
	socketBufLen += count;
	while (socketBufLen >= 5)
	{
	    RHTcpTypeMessage* message = ((RHTcpTypeMessage*)socketBuf);
	    uint32_t len = ntohl(message->length);
	    uint32_t messageLen = len + sizeof(message->length);
	    if (len > sizeof(socketBuf) - sizeof(message->length))
	    {
		// Bogus length
		fprintf(stderr, "RH_TCP::checkForEvents read ridiculous length: %d. Corrupt message stream? Aborting\n", len);
		exit(1);
	    }
	    if (socketBufLen >= len + sizeof(message->length))
	    {
		// Got at least all of this message
		if (message->type == RH_TCP_MESSAGE_TYPE_PACKET && len >= 5)
		{
		    // REVISIT: need to check if we are actually receiving?
		    // Its a new packet, extract the headers and payload
		    RHTcpPacket* packet = ((RHTcpPacket*)socketBuf);
		    _rxHeaderTo    = packet->to;
		    _rxHeaderFrom  = packet->from;
		    _rxHeaderId    = packet->id;
		    _rxHeaderFlags = packet->flags;
		    uint32_t payloadLen = len - 5;
		    if (payloadLen <= sizeof(_rxBuf))
		    {
			// Enough room in our receiver buffer
			memcpy(_rxBuf, packet->payload, payloadLen);
			_rxBufLen = payloadLen;
			_rxBufFull = true;
		    }
		}
		// check for other message types here
		// Now remove the used message by copying the trailing bytes (maybe start of a new message?)
		// to the top of the buffer
		memcpy(socketBuf, socketBuf + messageLen, sizeof(socketBuf) - messageLen);
		socketBufLen -= messageLen;
	    }
	}
    }
}

void RH_TCP::validateRxBuf()
{
    // The headers have already been extracted
    if (_promiscuous ||
	_rxHeaderTo == _thisAddress ||
	_rxHeaderTo == RH_BROADCAST_ADDRESS)
    {
	_rxGood++;
	_rxBufValid = true;
    }
}

bool RH_TCP::available()
{
    if (_socket < 0)
	return false;
    checkForEvents();
    if (_rxBufFull)
    {
	validateRxBuf();
	_rxBufFull= false;
    }
    return _rxBufValid;
}

// Block until something is available
void RH_TCP::waitAvailable()
{
    waitAvailableTimeout(0); // 0 = Wait forever
}

// Block until something is available or timeout expires
bool RH_TCP::waitAvailableTimeout(uint16_t timeout)
{
    int            max_fd;
    fd_set         input;
    int            result;

    FD_ZERO(&input);
    FD_SET(_socket, &input);
    max_fd = _socket + 1;

    if (timeout)
    {
	struct timeval timer;
	// Timeout is in milliseconds
	timer.tv_sec  = timeout / 1000;
	timer.tv_usec = (timeout % 1000) * 1000;
	result = select(max_fd, &input, NULL, NULL, &timer);
    }
    else
    {
	result = select(max_fd, &input, NULL, NULL, NULL);
    }
    if (result < 0)
	fprintf(stderr, "RH_TCP::waitAvailableTimeout: select failed %s\n", strerror(errno));
    return result > 0;
}

bool RH_TCP::recv(uint8_t* buf, uint8_t* len)
{
    if (!available())
	return false;

    if (buf && len)
    {
	if (*len > _rxBufLen)
	    *len = _rxBufLen;
	memcpy(buf, _rxBuf, *len);
    }
    clearRxBuf();
    return true;
}

bool RH_TCP::send(const uint8_t* data, uint8_t len)
{
    bool ret = sendPacket(data, len);
    delay(10); // Wait for transmit to succeed. REVISIT: depends on length and speed
    return ret;
}

uint8_t RH_TCP::maxMessageLength()
{
    return RH_TCP_MAX_MESSAGE_LEN;
}

void RH_TCP::setThisAddress(uint8_t address)
{
    RHGenericDriver::setThisAddress(address);
    sendThisAddress(_thisAddress);
}

bool RH_TCP::sendThisAddress(uint8_t thisAddress)
{
    if (_socket < 0)
	return false;
    RHTcpThisAddress m;
    m.length = htonl(2);
    m.type = RH_TCP_MESSAGE_TYPE_THISADDRESS;
    m.thisAddress = thisAddress;
    ssize_t sent = write(_socket, &m, sizeof(m));
    return sent > 0;
}

bool RH_TCP::sendPacket(const uint8_t* data, uint8_t len)
{
    if (_socket < 0)
	return false;
    RHTcpPacket m;
    m.length = htonl(len + 4);
    m.type  = RH_TCP_MESSAGE_TYPE_PACKET;
    m.to    = _txHeaderTo;
    m.from  = _txHeaderFrom;
    m.id    = _txHeaderId;
    m.flags = _txHeaderFlags;
    memcpy(m.payload, data, len);
    ssize_t sent = write(_socket, &m, len + 8);
    return sent > 0;
}

#endif