PgSQL前后端通信协议

了解PostgreSQL服务器与客户端通信使用的TCP协议

了解PostgreSQL服务器与客户端通信使用的TCP协议

启动阶段

基本流程

  • 客户端发送一条StartupMessage (F)向服务端发起连接请求

    载荷包括0x30000的Int32版本号魔数,以及一系列kv结构的运行时参数(NULL0分割,必须参数为user),

  • 客户端等待服务端响应,主要是等待服务端发送的ReadyForQuery (Z)事件,该事件代表服务端已经准备好接收请求。

上面是连接建立过程中最主要的两个事件,其他事件包括包括认证消息 AuthenticationXXX (R) ,后端密钥消息 BackendKeyData (K),错误消息ErrorResponse (E),一系列上下文无关消息(NoticeResponse (N)NotificationResponse (A)ParameterStatus(S)

模拟客户端

编写一个go程序模拟这一过程

package main

import (
	"fmt"
	"net"
	"time"

	"github.com/jackc/pgx/pgproto3"
)

func GetFrontend(address string) *pgproto3.Frontend {
	conn, _ := (&net.Dialer{KeepAlive: 5 * time.Minute}).Dial("tcp4", address)
	frontend, _ := pgproto3.NewFrontend(conn, conn)
	return frontend
}

func main() {
	frontend := GetFrontend("127.0.0.1:5432")

	// 建立连接
	startupMsg := &pgproto3.StartupMessage{
		ProtocolVersion: pgproto3.ProtocolVersionNumber,
		Parameters:      map[string]string{"user": "vonng"},
	}
	frontend.Send(startupMsg)

	// 启动过程,收到ReadyForQuery消息代表启动过程结束
	for {
		msg, _ := frontend.Receive()
		fmt.Printf("%T %v\n", msg, msg)
		if _, ok := msg.(*pgproto3.ReadyForQuery); ok {
			fmt.Println("[STARTUP] connection established")
			break
		}
	}

	// 简单查询协议
	simpleQueryMsg := &pgproto3.Query{String: `SELECT 1 as a;`}
	frontend.Send(simpleQueryMsg)
	// 收到CommandComplete消息代表查询结束
	for {
		msg, _ := frontend.Receive()
		fmt.Printf("%T %v\n", msg, msg)
		if _, ok := msg.(*pgproto3.CommandComplete); ok {
			fmt.Println("[QUERY] query complete")
			break
		}
	}
}

输出结果为:

*pgproto3.Authentication &{0 [0 0 0 0] [] []}
*pgproto3.ParameterStatus &{application_name }
*pgproto3.ParameterStatus &{client_encoding UTF8}
*pgproto3.ParameterStatus &{DateStyle ISO, MDY}
*pgproto3.ParameterStatus &{integer_datetimes on}
*pgproto3.ParameterStatus &{IntervalStyle postgres}
*pgproto3.ParameterStatus &{is_superuser on}
*pgproto3.ParameterStatus &{server_encoding UTF8}
*pgproto3.ParameterStatus &{server_version 11.3}
*pgproto3.ParameterStatus &{session_authorization vonng}
*pgproto3.ParameterStatus &{standard_conforming_strings on}
*pgproto3.ParameterStatus &{TimeZone PRC}
*pgproto3.BackendKeyData &{35703 345830596}
*pgproto3.ReadyForQuery &{73}
[STARTUP] connection established
*pgproto3.RowDescription &{[{a 0 0 23 4 -1 0}]}
*pgproto3.DataRow &{[[49]]}
*pgproto3.CommandComplete &{SELECT 1}
[QUERY] query complete

连接代理

可以在jackc/pgx/pgproto3的基础上,很轻松地编写一些中间件。例如下面的代码就是一个非常简单的“连接代理”:

package main

import (
	"io"
	"net"
	"strings"
	"time"

	"github.com/jackc/pgx/pgproto3"
)

type ProxyServer struct {
	UpstreamAddr string
	ListenAddr   string
	Listener     net.Listener
	Dialer       net.Dialer
}

func NewProxyServer(listenAddr, upstreamAddr string) *ProxyServer {
	ln, _ := net.Listen(`tcp4`, listenAddr)
	return &ProxyServer{
		ListenAddr:   listenAddr,
		UpstreamAddr: upstreamAddr,
		Listener:     ln,
		Dialer:       net.Dialer{KeepAlive: 1 * time.Minute},
	}
}

func (ps *ProxyServer) Serve() error {
	for {
		conn, err := ps.Listener.Accept()
		if err != nil {
			panic(err)
		}
		go ps.ServeOne(conn)
	}
}

func (ps *ProxyServer) ServeOne(clientConn net.Conn) error {
	backend, _ := pgproto3.NewBackend(clientConn, clientConn)
	startupMsg, err := backend.ReceiveStartupMessage()
	if err != nil && strings.Contains(err.Error(), "ssl") {
		if _, err := clientConn.Write([]byte(`N`)); err != nil {
			panic(err)
		}
		// ssl is not welcome, now receive real startup msg
		startupMsg, err = backend.ReceiveStartupMessage()
		if err != nil {
			panic(err)
		}
	}

	serverConn, _ := ps.Dialer.Dial(`tcp4`, ps.UpstreamAddr)
	frontend, _ := pgproto3.NewFrontend(serverConn, serverConn)
	frontend.Send(startupMsg)

	errChan := make(chan error, 2)
	go func() {
		_, err := io.Copy(clientConn, serverConn)
		errChan <- err
	}()
	go func() {
		_, err := io.Copy(serverConn, clientConn)
		errChan <- err
	}()

	return <-errChan
}

func main() {
	proxy := NewProxyServer("127.0.0.1:5433", "127.0.0.1:5432")
	proxy.Serve()
}

这里代理监听5433端口,并将消息解析并转发至在5432端口的真实的数据库服务器。在另一个Session中执行以下命令:

$ psql postgres://127.0.0.1:5433/data?sslmode=disable -c 'SELECT * FROM pg_stat_activity LIMIT 1;'

可以观察到这一过程中的消息往来:

[B2F] *pgproto3.ParameterStatus &{application_name psql}
[B2F] *pgproto3.ParameterStatus &{client_encoding UTF8}
[B2F] *pgproto3.ParameterStatus &{DateStyle ISO, MDY}
[B2F] *pgproto3.ParameterStatus &{integer_datetimes on}
[B2F] *pgproto3.ParameterStatus &{IntervalStyle postgres}
[B2F] *pgproto3.ParameterStatus &{is_superuser on}
[B2F] *pgproto3.ParameterStatus &{server_encoding UTF8}
[B2F] *pgproto3.ParameterStatus &{server_version 11.3}
[B2F] *pgproto3.ParameterStatus &{session_authorization vonng}
[B2F] *pgproto3.ParameterStatus &{standard_conforming_strings on}
[B2F] *pgproto3.ParameterStatus &{TimeZone PRC}
[B2F] *pgproto3.BackendKeyData &{41588 1354047533}
[B2F] *pgproto3.ReadyForQuery &{73}
[F2B] *pgproto3.Query &{SELECT * FROM pg_stat_activity LIMIT 1;}
[B2F] *pgproto3.RowDescription &{[{datid 11750 1 26 4 -1 0} {datname 11750 2 19 64 -1 0} {pid 11750 3 23 4 -1 0} {usesysid 11750 4 26 4 -1 0} {usename 11750 5 19 64 -1 0} {application_name 11750 6 25 -1 -1 0} {client_addr 11750 7 869 -1 -1 0} {client_hostname 11750 8 25 -1 -1 0} {client_port 11750 9 23 4 -1 0} {backend_start 11750 10 1184 8 -1 0} {xact_start 11750 11 1184 8 -1 0} {query_start 11750 12 1184 8 -1 0} {state_change 11750 13 1184 8 -1 0} {wait_event_type 11750 14 25 -1 -1 0} {wait_event 11750 15 25 -1 -1 0} {state 11750 16 25 -1 -1 0} {backend_xid 11750 17 28 4 -1 0} {backend_xmin 11750 18 28 4 -1 0} {query 11750 19 25 -1 -1 0} {backend_type 11750 20 25 -1 -1 0}]}
[B2F] *pgproto3.DataRow &{[[] [] [52 56 55 52] [] [] [] [] [] [] [50 48 49 57 45 48 53 45 49 56 32 50 48 58 52 56 58 49 57 46 51 50 55 50 54 55 43 48 56] [] [] [] [65 99 116 105 118 105 116 121] [65 117 116 111 86 97 99 117 117 109 77 97 105 110] [] [] [] [] [97 117 116 111 118 97 99 117 117 109 32 108 97 117 110 99 104 101 114]]}
[B2F] *pgproto3.CommandComplete &{SELECT 1}
[B2F] *pgproto3.ReadyForQuery &{73}
[F2B] *pgproto3.Terminate &{}