/* * network.cpp * * Created on: 2022年3月4日 * Author: pengzc */ #include "network.h" #include #include #include #include #include #include #include #include #include "utils/Log.h" namespace base { /** * 设置非阻塞模式 */ static int SetNonBlock(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) { LOGE("Get flags error:%s", strerror(errno)); return -1; } flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) { LOGE("Set flags error:%s", strerror(errno)); return -1; } return 0; } /** * -1 error * 0 timeout * >0 get event */ static int Poll(int fd, int events, int timeout_millis, int* revents) { int ret = -1; struct pollfd pfd; memset(&pfd, 0, int(sizeof(pfd))); pfd.fd = fd; pfd.events = events; // retry again for EINTR for (int i = 0; i < 2; i++) { ret = ::poll(&pfd, 1, timeout_millis); if (-1 == ret && EINTR == errno) continue; break; } if (ret >= 0) { *revents = pfd.revents; } return ret; } static int NonBlockConnect(int fd, const struct sockaddr* ai_addr, socklen_t ai_addrlen, int timeout_millis) { /*设置套接字为非阻塞*/ if (SetNonBlock(fd) != 0) { return SOCKET_ERROR; } /*阻塞情况下linux系统默认超时时间为75s*/ int ret = ::connect(fd, ai_addr, ai_addrlen); if (ret == 0) { return ret; } if (errno != EINPROGRESS) { LOGE("connect failure, %s(%d)", strerror(errno), errno); return SOCKET_ERROR; } //LOGD("Doing connection.\n"); /*正在处理连接*/ int revents = 0; ret = Poll(fd, POLLIN | POLLOUT | POLLHUP | POLLERR | POLLNVAL, timeout_millis, &revents); if (ret < 0) { LOGE("poll failure, %s(%d)", strerror(errno), errno); return SOCKET_ERROR; } if (ret == 0) { return SOCKET_TIMEOUT; } if (((revents & POLLOUT) != 0) && ((revents & POLLIN) == 0)) { return SOCKET_SUCCESS; } if ((revents & POLLOUT) != 0 && (revents & POLLIN) != 0) { int err = 0; int errlen = sizeof(err); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, (socklen_t*)&errlen) == -1) { LOGE("getsockopt failure, %s", strerror(errno)); return SOCKET_ERROR; } if (err) { errno = err; LOGE("connect failure, %s(%d)", strerror(errno), errno); return SOCKET_ERROR; } } return SOCKET_ERROR; } static int Send(int sock_fd, const uint8_t* buffer, size_t buflen) { if (buflen <= 0) { return 0; } if (buffer == NULL) { return 0; } size_t tmp; size_t total = buflen; const unsigned char *p = buffer; while (true) { tmp = send(sock_fd, p, total, 0); if (tmp < 0) { // 当send收到信号时,可以继续写,但这里返回-1. if (errno == EINTR) { usleep(1000 * 100); continue; } // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满, // 在这里做延时后再重试. if (errno == EAGAIN) { usleep(1000 * 100); continue; } return -1; } if ((size_t) tmp == total) { // LOGD("send success"); return buflen; } total -= tmp; p += tmp; } return tmp; } Socket::Socket() { socket_ = -1; } Socket::~Socket() { } int Socket::Connect(const std::string& host, int port, int timeout_millis) { if (socket_ >= 0) { LOGE("already connect"); return SOCKET_ERROR; } struct addrinfo host_info; struct addrinfo * host_info_list = NULL; memset(&host_info, 0, int(sizeof(host_info))); host_info.ai_family = AF_UNSPEC; host_info.ai_socktype = SOCK_STREAM; char port_str[32] = {0}; snprintf(port_str, int(sizeof(port_str)), "%d", port); int ret = getaddrinfo(host.c_str(), port_str, &host_info, &host_info_list); if (ret != 0) { LOGE("host could not be resolved"); return SOCKET_GET_ADDRESS_INFO; } int sock = socket(host_info_list->ai_family, host_info_list->ai_socktype, host_info_list->ai_protocol); if (sock == -1) { LOGE("could not open socket, %s(%d)", strerror(errno), errno); freeaddrinfo(host_info_list); return SOCKET_INVALID; } ret = NonBlockConnect(sock, host_info_list->ai_addr, host_info_list->ai_addrlen, timeout_millis); if (ret != 0) { freeaddrinfo(host_info_list); close(sock); return ret; } socket_ = sock; freeaddrinfo(host_info_list); return 0; } int Socket::Write(uint8_t* data, int data_len) { if (socket_ < 0) { return SOCKET_INVALID; } return Send(socket_, data, data_len); } int Socket::Read(uint8_t* buffer, int buffer_len, int timeout_millis) { if (socket_ < 0) { //没有连接 return SOCKET_INVALID; } int revents = 0; int ret = Poll(socket_, POLLIN | POLLHUP | POLLERR | POLLNVAL, timeout_millis, &revents); if (ret < 0) { return SOCKET_ERROR; } if (ret == 0) { return SOCKET_TIMEOUT; } int recv_bytes = recv(socket_, buffer, buffer_len, 0); if (recv_bytes == 0) { //连接已关闭 //LOGD("recv 0, close it"); Close(); return 0; } else if (recv_bytes < 0) { //添加错误处理 if ((errno == EINTR) || (errno == EWOULDBLOCK) || (errno == EAGAIN)) { //继续recv return SOCKET_TIMEOUT; } else { LOGE("recv: %s(%d)", strerror(errno), errno); Close(); return SOCKET_ERROR; } } return recv_bytes; } int Socket::Close() { if (socket_ >= 0) { close(socket_); socket_ = -1; } return 0; } InetAddress Socket::inet_address() const { return address_; } //ServerSocket ServerSocket::ServerSocket(int port) { server_socket_ = socket(PF_INET, SOCK_STREAM, 0); if (server_socket_ < 0) { LOGE("socket failure, %s(%d)", strerror(errno), errno); return; } int on = 1; if (setsockopt(server_socket_, SOL_SOCKET, SO_REUSEADDR, &on, socklen_t(sizeof(on))) != 0) { LOGE("setsockopt failure, %s(%d)", strerror(errno), errno); goto failure; } memset(&server_addr_, 0, int(sizeof(server_addr_))); server_addr_.sin_family = AF_INET; server_addr_.sin_port = htons(port); server_addr_.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(server_socket_, (struct sockaddr*) &server_addr_, socklen_t(sizeof(server_addr_))) < 0) { LOGE("bind failure, %s(%d)", strerror(errno), errno); goto failure; } if (listen(server_socket_, 64) < 0) { LOGE("listen failure, %s(%d)", strerror(errno), errno); goto failure; } return; failure: if (server_socket_ >= 0) { close(server_socket_); server_socket_ = -1; } } ServerSocket::~ServerSocket() { if (server_socket_ >= 0) { close(server_socket_); server_socket_ = -1; } } int ServerSocket::Accept(Socket* socket) { if (server_socket_ < 0) { return SOCKET_INVALID; } if (socket == NULL) { LOGE("argument socket must be not null"); return SOCKET_INVALID; } socklen_t addr_len = socklen_t(sizeof(socket->address_.address_)); int client = accept(server_socket_, (struct sockaddr*) &socket->address_.address_, &addr_len); if (client < 0) { LOGD("accept failure, %s(%d)", strerror(errno), errno); return SOCKET_ERROR; } socket->socket_ = client; return client; } InetAddress::InetAddress() { memset(&address_, 0, int(sizeof(address_))); } InetAddress::InetAddress(const std::string& host, int port) { memset(&address_, 0, int(sizeof(address_))); address_.sin_family = AF_INET; address_.sin_port = htons(port); address_.sin_addr.s_addr = inet_addr(host.c_str()); } std::string InetAddress::ipv4() const { struct sockaddr_in in = {0}; if (0 == memcmp(&address_, &in, int(sizeof(in)))) { //未初始化的地址返回空 return ""; } return inet_ntoa(address_.sin_addr); } DatagramSocket::DatagramSocket() { socket_ = socket(AF_INET, SOCK_DGRAM, 0); if (socket_ < 0) { LOGE("socket failure, %s(%d)", strerror(errno), errno); } } DatagramSocket::DatagramSocket(int port) { socket_ = socket(AF_INET, SOCK_DGRAM, 0); if (socket_ < 0) { LOGE("socket failure, %s(%d)", strerror(errno), errno); return; } struct sockaddr_in ser_addr; memset(&ser_addr, 0, int(sizeof(ser_addr))); ser_addr.sin_family = AF_INET; ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); ser_addr.sin_port = htons(port); int ret = bind(socket_, (struct sockaddr*) &ser_addr, socklen_t(sizeof(ser_addr))); if (ret < 0) { LOGE("bind failure, %s(%d)", strerror(errno), errno); close(socket_); socket_ = -1; } } DatagramSocket::~DatagramSocket() { } int DatagramSocket::SendTo(const InetAddress& address, const uint8_t* data, int len) { if (socket_ < 0) { return SOCKET_INVALID; } return sendto(socket_, data, len, 0, (sockaddr*) &address.address_, socklen_t(sizeof(address.address_))); } int DatagramSocket::Receive(InetAddress* address, uint8_t* buffer, int buffer_length, int timeout_millis) { if (socket_ < 0) { return SOCKET_INVALID; } int revents = 0; int ret = Poll(socket_, POLLIN, timeout_millis, &revents); if (ret < 0) { return SOCKET_ERROR; } if (ret == 0) { return SOCKET_TIMEOUT; } sockaddr* addr = NULL; socklen_t addr_len = 0; if (address != NULL) { addr = (sockaddr*) &address->address_; addr_len = sizeof(address->address_); } ret = recvfrom(socket_, buffer, buffer_length, 0, addr, &addr_len); if (ret == 0) { //连接已关闭 Close(); return 0; } else if (ret < 0) { if ((errno == EINTR) || (errno == EWOULDBLOCK) || (errno == EAGAIN)) { //继续recv return SOCKET_TIMEOUT; } else { LOGE("recv: %s(%d)", strerror(errno), errno); Close(); return SOCKET_ERROR; } } return ret; } int DatagramSocket::Close() { if (socket_ >= 0) { close(socket_); socket_ = -1; } return 0; } #if 0 //测试程序段 std::thread server_thread([](){ net::ServerSocket server(8080); while(true) { net::Socket so; LOGE("服务端 等待连接"); int ret = server.Accept(&so); if (ret < 0) { LOGE("Accept failure %d", ret); break; } const char* msg = "hello"; uint8_t buf[1024] = {0}; ret = so.Read(buf, int(sizeof(buf)), 2000); LOGE("服务端 读取 %d %s", ret, buf); ret = so.Write((uint8_t*)msg, strlen(msg)); LOGE("服务端 发送 %d %s", ret, ret == strlen(msg) ? "成功" : "失败"); so.Close(); } }); server_thread.detach(); int test_count = 100000; usleep(1000 * 1000); while(test_count--) { LOGD("test count %d", test_count); net::Socket so; //net::Conn* conn = net::Dial("tcp", "14.215.177.38:80"); // int ret = so.Connect("14.215.177.38", 80, 3000); int ret = so.Connect("127.0.0.1", 8080, 3000); // int ret = so.Connect("www.baidu.com", 80, 3000); LOGD("连接 %d", ret); if (ret == 0) { uint8_t buf[2048] = {0}; const char* req = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n"; //发送 ret = so.Write((uint8_t*)req, strlen(req)); LOGD("客户端 写 %d", ret); while (true) { //读取,超时1000毫秒 int n = so.Read(buf, int(sizeof(buf)) - 1, 1000); if (n > 0) { buf[n] = 0; LOGD("客户端 读取 %d字节: %s", n, buf); } else if (n == 0) { LOGD("客户端 连接正常断开"); break; } else { if (n == net::SOCKET_TIMEOUT) { LOGD("客户端 读取超时, 继续读"); continue; } else { LOGE("客户端 出错 %d", n); break; } } } //关闭连接 so.Close(); //释放内存 } } #endif //测试UDP #if 0 { std::thread server_thread([](){ net::DatagramSocket server(8080); while(true) { LOGE("服务端 等待接收"); net::InetAddress from_address; uint8_t buffer[1024] = {0}; int ret = server.Receive(&from_address, buffer, int(sizeof(buffer)), 2000); if (ret == net::SOCKET_TIMEOUT) { LOGD("服务端接收超时"); continue; } if (ret < 0) { LOGE("服务端接收出错 %d", ret); break; } LOGE("服务端 读取 来自 %s %d %s", from_address.ipv4().c_str(), ret, buffer); const char* msg = "hello"; ret = server.SendTo(from_address, (uint8_t*)msg, int(strlen(msg))); LOGE("服务端 发送 %d %s", ret, ret == strlen(msg) ? "成功" : "失败"); } }); server_thread.detach(); } usleep(1000 * 1000); { //UDP客户端 net::InetAddress address("127.0.0.1", 8080); net::DatagramSocket so; while (true) { const char* msg = "I am from client"; int ret = so.SendTo(address, (uint8_t*)msg, int(strlen(msg))); LOGE("客户端 发送 %d %s", ret, ret == strlen(msg) ? "成功" : "失败"); uint8_t buffer[128] = {0}; ret = so.Receive(&address, buffer, int(sizeof(buffer)), 2000); if (ret == net::SOCKET_TIMEOUT) { LOGE("客户端 接收超时"); continue; } if (ret < 0) { LOGE("客户端 接收失败"); break; } LOGE("客户端 接收 来自 %s %d %s", address.ipv4().c_str(), ret, buffer); } } #endif } /* namespace base */