Overview
TCPSender 将字节流转成不可靠的数据报,在后面的实验4中,TCPSender将和TCPReveiver一起实现TCPPeer。
TCP 是一种通过不可靠的数据报可靠地传送一对流控字节流(每个方向一个)的协议。TCP 连接中有两方参与,每一方都是另一方的对等体。每一方同时充当“发送方”(其自己的传出字节流)和“接收方”(传入字节流)。本实验将实现 TCP 的“发送方”部分,负责从字节流(由某个发送方应用程序创建和写入)读取数据,并将该流转换为一系列传出的 TCP 段。
How does the TCPSender know if a segment was lost?
- TCP发送一组TCPSenderMessages,每个消息包含传出字节流的子字符串以及序列号,并且在流的开头有SYN标志,在流的结尾有FIN标志。
- TCPSender还必须跟踪其他未完成的段,直到所占用的序列号被确认,TCPSender会调用tick()方法指示时间的流逝,如果有段发送太久没有被确认,需要重新发送。
以下是超时的定义:
- 每隔几毫秒,TCPSender的tick()就会被调用一次,告诉自上次调用该方法过去多少毫秒;
- 构造TCPSender会为其提供一个参数,告知其重传超时(RTO)的初始值,RTO是重复发送TCP段之前要等待的毫秒数,RTO会随时间变化而变化,但其初始值会保存在
initial_RTO_ms
变量中; - 需要实现一个超时重传计时器,当RTO过去,警报就会响起,只能使用tick()方法实现,不能直接获取时间;
- 每次发送数据段(序列长度非0)就要打开计时器(无论是第一次发送还是重传),以便在RTO毫秒过期;
- 当所有未完成的数据报都被确认后,停止重传计时器;
- 如果调用tick()并且重传计时器已经过期(6,7两条是重点),:
- 重新传输最早的未确认的TCP段;
- 如果窗口大小非0:
i). 跟踪连续重传的次数,TCPConnection需要根据连续重传的次数决定是否中止连接;
ii). 将RTO的值加倍,这被称为”指数退避“,以免进一步阻碍工作;
iii). 重置重传计时器并启动它,使其在 RTO 毫秒后过期.
- 当接收方向发送方发送一个确认成功收到新数据的消息时:(该消息比之前所有确认的消息的绝对序列号都大):
- 设置RTO为初始值;
- 如果发送方有任何未发送完成的数据,则重新启动计时器,以便在RTO毫秒后过期;
- 将连续重传次数置为0。
Implementing the TCP sender
void push( const TransmitFunction& transmit ) :
TCPSender被要求填充窗口,从流中读取并发送尽可能多的TCPSenderMessages,只要有新的字节要读取并且窗口中有可用空间。通过transmit函数发送。
需要确保每个TCPSendMessage都符合接收方窗口大小,并尽可能大,但是不要超过TCPConfig::MAX PAYLOAD SIZE
可以使用TCPSenderMessage::sequence length()方法计算一个段的序列号总数,SYN和FIN也各占一个序列号。
Tips: 假如接收方窗口大小为0,则应在发送消息时假装窗口大小为1。
void receive( const TCPReceiverMessage& msg ):
从接收方收到一条消息,传达窗口新的左边缘(= ackno)和右边缘(= ackno + 窗口大小)。TCPSender 应查看其未完成段的集合并删除任何现已完全确认的段(ackno 大于段中的所有序列号)。
void tick( uint64 t ms since last tick, const TransmitFunction& transmit );
时间已过去 — 自上次调用此方法以来已过去了一定数量的毫秒数。发送方可能需要重新传输未完成的段;它可以调用transmit()函数来执行此操作。
TCPSenderMessage make_empty_message() const;
TCPSender 应生成并发送长度为零的消息,并正确设置序列号。如果对等端想要发送 TCPReceiverMessage(例如,因为它需要确认来自对等端发送方的某些内容),并且需要生成与之配合的 TCPSenderMessage,则这很有用。
接下来是代码实现部分,首先实现一个定时器类,这个类负责记录时间,并判断是否超时:
class Timer
{
private:uint32_t _time_out = 0;uint32_t _time_cur = 0;bool open = false;public:Timer() = default;Timer( const uint32_t time_out ) : _time_out( time_out ) {};void stop() { open = false; }void set_time_out( const uint32_t time_out ) { _time_out = time_out; }uint32_t get_time_out() { return _time_out; }void restart(){open = true;_time_cur = 0;}void tick( const size_t ms_since_last_tick ) { _time_cur += ms_since_last_tick; }bool check_time_out() { return open && _time_cur >= _time_out; }bool is_open() { return open; }
};
然后修改TCPSender
类的成员变量和构造函数:
class TCPSender
{
public:/* Construct TCP sender with given default Retransmission Timeout and possible ISN */TCPSender( ByteStream&& input, Wrap32 isn, uint64_t initial_RTO_ms ): input_( std::move( input ) ), isn_( isn ), initial_RTO_ms_( initial_RTO_ms ), _timer( initial_RTO_ms_ ){}/* Generate an empty TCPSenderMessage */TCPSenderMessage make_empty_message() const;/* Receive and process a TCPReceiverMessage from the peer's receiver */void receive( const TCPReceiverMessage& msg );/* Type of the `transmit` function that the push and tick methods can use to send messages */using TransmitFunction = std::function<void( const TCPSenderMessage& )>;/* Push bytes from the outbound stream */void push( const TransmitFunction& transmit );/* Time has passed by the given # of milliseconds since the last time the tick() method was called */void tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit );// Accessorsuint64_t sequence_numbers_in_flight() const; // How many sequence numbers are outstanding?uint64_t consecutive_retransmissions() const; // How many consecutive *re*transmissions have happened?Writer& writer() { return input_.writer(); }const Writer& writer() const { return input_.writer(); }// Access input stream reader, but const-only (can't read from outside)const Reader& reader() const { return input_.reader(); }private:// Variables initialized in constructorByteStream input_;Wrap32 isn_;uint64_t initial_RTO_ms_;// 补充成员变量Timer _timer; // 定时器uint64_t _consecutive_retransmissions = 0; // 连续重传次数uint64_t _sequence_numbers_in_flight = 0; // 未被确认的字节大小uint32_t _windows_size = 1; // 接收方窗口大小uint64_t _next_seqno { 0 }; // 下一个报文的序列号std::queue<std::pair<uint64_t, TCPSenderMessage>> _outstanding_seg {}; //已经发送但是没有被确认的segment队列bool SYN = false, FIN = false;
};
实现tcp_sender.cc文件中的各个函数:
push函数需要注意以下几点:
- push函数需要判断payload_size的大小,它取决于MAX_PAYLOAD_SIZE,接收方窗口大小和发送方剩余数据大小中最小的;
- FIN位的设置也应该判断接收方窗口是否能够容纳;
- RST位取决于input_是否有error。
receive函数需要注意以下几点:
- 如果接收到报文有RST,要对input置错;
- 接收到的ackno,要将所有ackno之前的报文都从未确认的队列中删除;
- 有报文被确认需要重置计时器,所有报文都被确认需要中止计时器。
除了以上各点,还需要注意绝对序列号和相对序列号的转换。
#include "tcp_sender.hh"
#include "tcp_config.hh"using namespace std;uint64_t TCPSender::sequence_numbers_in_flight() const
{// 返回未被确认的字节大小return _sequence_numbers_in_flight;
}uint64_t TCPSender::consecutive_retransmissions() const
{// 返回连续重传的次数return _consecutive_retransmissions;
}void TCPSender::push( const TransmitFunction& transmit )
{// 首先判断窗口大小uint32_t windows_size = max( _windows_size, (uint32_t)1 );// 接收方有足够大小的窗口才可以开始发送while ( _sequence_numbers_in_flight < windows_size ) {TCPSenderMessage msg;// SYN = false 说明还没有建立连接,先建立连接if ( !SYN ) {msg.SYN = true;SYN = true;}// 对于 payload_size的大小:// 首先不能超过MAX_PAYLOAD_SIZE// 其次接收方窗口必须有足够大小放下,用windows_size减去未被确认的字节数再减去SYN所占用的// 最后是不能超过input_中所存储的有效字节auto payload_size= min( TCPConfig::MAX_PAYLOAD_SIZE,min( windows_size - _sequence_numbers_in_flight - msg.SYN, input_.writer().bytes_pushed() ) );string payload;// 读取payload_size个字节,放入payload中for ( uint64_t i = 0; i < payload_size; ++i ) {payload += input_.reader().peek();input_.reader().pop( 1 );}msg.payload = move( payload );// 如果已经读取完input_中的数据,说明所有要发送的数据都已经发送,那么就将msg的FIN置为true// 需要注意的是要判断FIN是否能放下,因为之前选择payload的字段时没有考虑FIN,意味着如果SYN和payload就占满了windows_size,那么就不能传输FIN了if ( !FIN && input_.reader().is_finished()&& _sequence_numbers_in_flight + msg.sequence_length() < windows_size ) {msg.FIN = true;FIN = true;}int msg_len = msg.sequence_length();if ( msg.sequence_length() == 0 )break;// 从absolute_seqno转换到seqnomsg.seqno = Wrap32::wrap( _next_seqno, isn_ );msg.RST = input_.has_error();transmit( msg );// 发送数据后,如果没有打开计时器就打开计时器if ( !_timer.is_open() ) {_timer.restart();}// 将报文暂存在队列中,以便超时重传_outstanding_seg.push( { _next_seqno, move( msg ) } );// 更新未接收到的字节数和下一个报文的seqno_sequence_numbers_in_flight += msg_len;_next_seqno += msg_len;}
}TCPSenderMessage TCPSender::make_empty_message() const
{// 空的message只需要有RST和seqno,不需要payloadTCPSenderMessage msg;msg.RST = input_.has_error();msg.seqno = Wrap32::wrap( _next_seqno, isn_ );return msg;
}void TCPSender::receive( const TCPReceiverMessage& msg )
{// 如果接收到的报文没有ackno,就只更新_windows_sizeif ( !msg.ackno.has_value() ) {if ( msg.RST )input_.set_error();_windows_size = msg.window_size;return;}// 接收到的报文有RST,那就置错if ( msg.RST )input_.set_error();// 将报文的ackno转成absolute_acknouint64_t abs_ackno = msg.ackno.value().unwrap( isn_, _next_seqno );// 如果发现abs_ackno比_next_seqno还大,显然这是不合理的,就直接返回;if ( abs_ackno > _next_seqno )return;bool receive = false;// 将ackno之前的所有未确认的报文确认,并从队列中删除while ( !_outstanding_seg.empty() ) {auto& [seqno, outstanding_msg] = _outstanding_seg.front();if ( seqno + outstanding_msg.sequence_length() - 1 < abs_ackno ) {receive = true;_sequence_numbers_in_flight -= outstanding_msg.sequence_length();_outstanding_seg.pop();} elsebreak;}// 如果有报文确认,那么就将连续重传次数清零,并且重置计时器if ( receive ) {_consecutive_retransmissions = 0;_timer.set_time_out( initial_RTO_ms_ );_timer.restart();}// 如果所有报文都被确认,那么计时器中止if ( _sequence_numbers_in_flight == 0 ) {_timer.stop();}_windows_size = msg.window_size;
}void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{_timer.tick( ms_since_last_tick );// 超时重传,并将RTO乘2if ( _timer.check_time_out() ) {transmit( _outstanding_seg.front().second );if ( _windows_size > 0 ) {++_consecutive_retransmissions;_timer.set_time_out( _timer.get_time_out() * 2 );}_timer.restart();}
}