文章目录
- 0.简介
- 1.PG通讯协议
- 1.1 消息格式
- 1.2 消息交互流程
- 1.2.1 启动流程
- 1.2.2 简单查询流程
- 1.2.3 扩展查询
- 1.2.3.1 pipelining
- 1.2.4 取消流程
- 1.2.5 结束流程
- 1.2.6 copy流程
- 1.2.7 错误和通知
0.简介
之前文章对于PG的内部模块做了一些介绍,接下来对PG和外部交互的部分进行介绍,本文主要介绍通讯协议的概念和PG中的消息格式和交互流程。
1.PG通讯协议
通讯协议,就是通信双方对数据传输和解析的一种约定。遵守协议的双方可以进行正常的“交流”。对于PG,其外部的客户端(像psql,JDBC,PgAdmin等)都需要遵循PG的通信协议才能正确的进行通信,下面主要针对PG 7.4 版本开始使用的 Protocol v3.0 版本介绍,该协议支持 TCP/IP 和 Unix 域套接字。下面从数据格式,消息类型和传输流程来分别进行介绍。
1.1 消息格式
消息的格式是:第一个字节标识消息的类型,随后用四个字节标识消息的长度,然后是消息体。
其解析过程可以参考SocketBackend函数
static int
SocketBackend(StringInfo inBuf)
{int qtype;/** Get message type code from the frontend.*/HOLD_CANCEL_INTERRUPTS();pq_startmsgread();qtype = pq_getbyte();if (qtype == EOF) /* frontend disconnected */{//}/** Validate message type code before trying to read body; if we have lost* sync, better to say "command unknown" than to run out of memory because* we used garbage as a length word.** This also gives us a place to set the doing_extended_query_message flag* as soon as possible.*/switch (qtype){case 'Q': /* simple query */doing_extended_query_message = false;if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3){/* old style without length word; convert */if (pq_getstring(inBuf)){if (IsTransactionState())ereport(COMMERROR,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("unexpected EOF on client connection with an open transaction")));else{/** Can't send DEBUG log messages to client at this* point. Since we're disconnecting right away, we* don't need to restore whereToSendOutput.*/whereToSendOutput = DestNone;ereport(DEBUG1,(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),errmsg("unexpected EOF on client connection")));}return EOF;}}break;case 'F': /* fastpath function call */doing_extended_query_message = false;if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3){if (GetOldFunctionMessage(inBuf)){if (IsTransactionState())ereport(COMMERROR,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("unexpected EOF on client connection with an open transaction")));else{/** Can't send DEBUG log messages to client at this* point. Since we're disconnecting right away, we* don't need to restore whereToSendOutput.*/whereToSendOutput = DestNone;ereport(DEBUG1,(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),errmsg("unexpected EOF on client connection")));}return EOF;}}break;case 'X': /* terminate */doing_extended_query_message = false;ignore_till_sync = false;break;case 'B': /* bind */case 'C': /* close */case 'D': /* describe */case 'E': /* execute */case 'H': /* flush */case 'P': /* parse */doing_extended_query_message = true;/* these are only legal in protocol 3 */if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalid frontend message type %d", qtype)));break;case 'S': /* sync *//* stop any active skip-till-Sync */ignore_till_sync = false;/* mark not-extended, so that a new error doesn't begin skip */doing_extended_query_message = false;/* only legal in protocol 3 */if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalid frontend message type %d", qtype)));break;case 'd': /* copy data */case 'c': /* copy done */case 'f': /* copy fail */doing_extended_query_message = false;/* these are only legal in protocol 3 */if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalid frontend message type %d", qtype)));break;default:/** Otherwise we got garbage from the frontend. We treat this as* fatal because we have probably lost message boundary sync, and* there's no good way to recover.*/ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalid frontend message type %d", qtype)));break;}/** In protocol version 3, all frontend messages have a length word next* after the type code; we can read the message contents independently of* the type.*/if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3){if (pq_getmessage(inBuf, 0))return EOF; /* suitable message already logged */}elsepq_endmsgread();RESUME_CANCEL_INTERRUPTS();return qtype;
}
1.2 消息交互流程
1.2.1 启动流程
客户端首先发送startup packet到服务器,服务器判断是否需要授权信息,需要的话这发送权限认证请求要求客户端发送密码,权限验证后进行一些参数信息传递,最后发送readyforquery表明连接创建成功。
1.2.2 简单查询流程
客户端会发送一个Query消息,服务端处理请求,然后返回查询结果,查询结果包含两部分,一是结构,即RowDescription(如列名,类型,长度);另外一个是数据,即DataRow(一行数据).
每个sql结束后都会返回一个commandcomplete,查询请求结束后回回复一条ReadyForQuery。
1.2.3 扩展查询
扩展查询将一个查询分为多个步骤,客户端先给服务的发送Parse消息,该消息包含参数化sql,服务端收到消息后,调用exec_parse_message 函数进行处理,进行语法分析、语义分析和重写,同时会创建一个 Plan Cache 的结构,用于缓存后续的执行计划;然后客户端发送bind消息,携带具体参数值,服务端收到该消息后,调用 exec_bind_message 函数进行处理;然后客户端发送describe,获取结果的列名,类型等信息;然后客户端发送execute去获取DataRow;最后Sync是扩展协议的请求消息结束标识。
1.2.3.1 pipelining
扩展查询允许使用流水线,即发送的一系列查询无需等待较早查询完成,减少给定系列操作的网络所需往返次数。但是需要考虑一个步骤失败,后续步骤已经发送给服务器的问题。
1.2.4 取消流程
在查询期间,客户端可能会请求取消查询,为了不让服务端在处理时不断检查来自客户端的新输入,取消请求采用的方式是打开一个新连接然后发送CancelRequet请求,而不是StartUp。那么如何知道要取消哪一个请求?依靠的是查询startup阶段,服务端返回给客户端的进程id和一个取消码。
1.2.5 结束流程
正常终止通过客户端发送的terminate终止消息来关闭连接,在异常常见下(数据库关闭),服务段可能没有收到请求就断开连接,在这种常见下,服务端会尝试在关闭连接之前发送错误或通知消息,给出原因。
1.2.6 copy流程
Copy 子协议对应三种模式:
copy-in 导入数据,对应命令 COPY FROM STDIN
copy-out 导出数据,对应命令 COPY TO STDOUT
copy-both 用于 walsender,在主备间批量传输数据
以 copy-in 为例,服务端收到 COPY 命令后,进入 COPY 模式,并回复 CopyInResponse。随后客户端通过 CopyData 消息传输数据,CopyComplete 消息标识数据传输完成,服务端收到该消息后,发送 CommandComplete 和 ReadyForQuery 消息,消息流如下:
1.2.7 错误和通知
错误和通知通过ErrorResponse 和通知消息 NoticeResponse来返回,具体含义如下: