tcp posix api
send发送
ssize_t nsend(int sockfd, const void *buf, size_t len, __attribute__((unused))int flags) {ssize_t length = 0;void* hostinfo = get_host_fromfd(sockfd);if (hostinfo == NULL) {return -1;}struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;if (stream->proto == IPPROTO_TCP) {struct ln_tcp_fragment* fragment = rte_malloc("send frag", sizeof(struct ln_tcp_fragment), 0);if (fragment == NULL) {return 0;}memset(fragment, 0, sizeof(struct ln_tcp_fragment));fragment->sport = stream->dport;fragment->dport = stream->sport;fragment->acknum = stream->recv_next;fragment->seqnum = stream->send_next;fragment->windows = LN_TCP_INITIAL_WINDOWS;fragment->tcp_flags = RTE_TCP_ACK_FLAG | RTE_TCP_PSH_FLAG;fragment->hdr_off = 0x50;fragment->data = rte_malloc("frag data", len + 1, 0);if (fragment->data == NULL) {rte_free(fragment);return -1;}memset(fragment->data, 0, len + 1);rte_memcpy(fragment->data, buf, len);fragment->length = len;length = fragment->length;rte_ring_mp_enqueue(stream->send_next, (void*)fragment);}return length;
}
recv接收
ssize_t nrecv(int sockfd, void *buf, size_t len, __attribute__((unused))int flags) {ssize_t length = 0;void* hostinfo = get_host_fromfd(sockfd);if (hostinfo == NULL) {return -1;}struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;if (stream->proto == IPPROTO_TCP) {struct ln_tcp_fragment* fragment = NULL;int nb_rev = 0;pthread_mutex_lock(&stream->mutex);while((nb_rev = rte_ring_mc_dequeue(stream->recvbuf, (void**)&fragment)) < 0) {pthread_cond_wait(&stream->cond, &stream->mutex);}pthread_mutex_unlock(&stream->mutex);if (fragment->length > len) {rte_memcpy(buf, fragment->data, len);int i;for (i = 0; i < fragment->length - len; i++) {fragment->data[i] = fragment->data[i + len];}fragment->length = fragment->length - len;length = fragment->length;rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);}else if (fragment->length == 0) {rte_free(fragment);}else {rte_memcpy(buf, fragment->data, len);length = fragment->length;rte_free(fragment->data);fragment->data = NULL;rte_free(fragment);}}return length;
}
数据包收发管理管理
接收
static int ln_tcp_enqueue_recvbuf(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {struct ln_tcp_fragment* fragment = rte_malloc("tcp frag", sizeof(struct ln_tcp_fragment));if (fragment == NULL) {return -1;}memset(fragment, 0, sizeof(struct ln_tcp_fragment));fragment->dport = ntohs(tcphdr->dst_port);fragment->sport = ntohs(tcphdr->src_port);uint8_t hdrlen = tcphdr->data_off >> 4;int payloadlen = tcplen - hdrlen * 4;if (payloadlen > 0) {uint8_t* payload = (uint8_t*)tcphdr + hdrlen * 4;fragment->data = rte_malloc("frag data", payloadlen + 1, 0);if (fragment->data == NULL) {rte_free(fragment);return -1;}memset(fragment, 0, payloadlen + 1);rte_memcpy(fragment->data, payload, payloadlen);fragment->length = payloadlen;}else if (payloadlen == 0) {fragment->length = 0;fragment->data = NULL;}rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);pthread_mutex_lock(&stream->mutex);pthread_cond_signal(&stream->cond);pthread_mutex_unlock(&stream->mutex);return 0;
}
发送
static int ln_tcp_send_ackpkt(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr) {struct ln_tcp_fragment* fragment = rte_malloc("ack frag", sizeof(struct ln_tcp_fragment), 0);if (fragment == NULL) {return -1;}memset(fragment, 0, sizeof(struct ln_tcp_fragment));fragment->sport = stream->dport;fragment->dport = stream->sport;fragment->acknum = stream->recv_next;fragment->seqnum = stream->send_next;fragment->windows = LN_TCP_INITIAL_WINDOWS;fragment->length = 0;fragment->data = NULL;fragment->hdr_off = 0x50;fragment->tcp_flags = RTE_TCP_ACK_FLAG;rte_ring_mp_enqueue(stream->sendbuf, (void*)fragment);return 0;
}
establish handle
static int ln_tcp_handle_close_established(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {if (tcphdr->tcp_flags & RTE_TCP_SYN_FLAG) {}if (tcphdr->tcp_flags & RTE_TCP_PSH_FLAG) {ln_tcp_enqueue_recvbuf(stream, tcphdr, tcplen);uint8_t hdrlen = tcphdr->data_off >> 4;int payloadlen = tcphdr - hdrlen * 4;stream->recv_next = stream->recv_next + payloadlen;stream->send_next = ntohl(tcphdr->recv_ack);ln_tcp_send_ackpkt(stream, tcphdr);}if (tcphdr->tcp_flags & RTE_TCP_ACK_FLAG) {}if (tcphdr->tcp_flags & RTE_TCP_FIN_FLAG) {stream->status = LN_TCP_STATUS_CLOSE_WAIT;ln_tcp_enqueue_recvbuf(stream, tcphdr, tcphdr->data_off >> 4);stream->recv_next = stream->recv_next + 1;stream->send_next = ntohl(tcphdr->recv_ack);ln_tcp_send_ackpkt(stream, tcphdr);}return 0;
}
tcp server
static int tcp_server_entry(__attribute__((unused)) void* arg) {int sockfd = nsocket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {return -1;}printf("tcp sockfd is %d\n", sockfd);struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr));servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_family = AF_INET;servaddr.sin_port = htons(8888);nbind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));nlisten(sockfd, 10);while (1) {struct sockaddr_in client;socklen_t len = sizeof(client);int connfd = naccept(sockfd, (struct sockaddr*)&client, &len);char buff[BUFFER_SIZE] = {0};while (1) {int n = nrecv(connfd, buff, BUFFER_SIZE, 0); // blockif (n > 0) {printf("recv: %s\n", buff);nsend(connfd, buff, n, 0);}else if (n == 0) {nclose(connfd);break;}else {//nonblock}}}nclose(sockfd);
}
效果展示
回去之后调好了,有点难调
总结
到目前为止,IP/TCP和IP/UDP的协议栈都写完了,但是没有并发效果;这个后面会解决。下一步是探索一下协议的扩展,写一个dns服务器来看一下如何基于tcp或者udp来扩展协议。
项目地址
项目地址
参考资料:https://github.com/0voice