From c5cecc665e396807720b6ba96a3a7086d23bb9bb Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Thu, 8 Jan 2026 20:06:34 +0800 Subject: [PATCH 1/2] support audit log plugin v2 Signed-off-by: Yang Keao --- cmd/replayer/main.go | 2 +- pkg/server/api/traffic.go | 2 +- pkg/sqlreplay/cmd/audit_log_extension.go | 315 ++++++++++++ pkg/sqlreplay/cmd/audit_log_extension_test.go | 480 ++++++++++++++++++ pkg/sqlreplay/cmd/audit_log_plugin.go | 25 +- pkg/sqlreplay/cmd/audit_log_plugin_test.go | 20 +- pkg/sqlreplay/cmd/cmd.go | 22 +- pkg/sqlreplay/manager/job.go | 15 +- pkg/sqlreplay/replay/mock_test.go | 2 +- pkg/sqlreplay/replay/replay.go | 24 +- pkg/sqlreplay/store/line.go | 2 +- pkg/sqlreplay/store/rotate.go | 7 +- pkg/sqlreplay/store/rotate_test.go | 2 +- 13 files changed, 881 insertions(+), 37 deletions(-) create mode 100644 pkg/sqlreplay/cmd/audit_log_extension.go create mode 100644 pkg/sqlreplay/cmd/audit_log_extension_test.go diff --git a/cmd/replayer/main.go b/cmd/replayer/main.go index 75deda7fa..6ba536f3f 100644 --- a/cmd/replayer/main.go +++ b/cmd/replayer/main.go @@ -136,7 +136,7 @@ func main() { Speed: *speed, Username: *username, Password: *password, - Format: *format, + Format: replaycmd.TrafficFormat(*format), ReadOnly: *readonly, StartTime: *startTime, CommandStartTime: *cmdStartTime, diff --git a/pkg/server/api/traffic.go b/pkg/server/api/traffic.go index a5250ad03..a92c4f3b3 100644 --- a/pkg/server/api/traffic.go +++ b/pkg/server/api/traffic.go @@ -100,7 +100,7 @@ func (h *Server) TrafficReplay(c *gin.Context) { } cfg.Username = c.PostForm("username") cfg.Password = c.PostForm("password") - cfg.Format = c.PostForm("format") + cfg.Format = cmd.TrafficFormat(c.PostForm("format")) cfg.ReadOnly = strings.EqualFold(c.PostForm("readonly"), "true") cfg.IgnoreErrs = strings.EqualFold(c.PostForm("ignore-errs"), "true") cfg.KeyFile = globalCfg.Security.EncryptionKeyPath diff --git a/pkg/sqlreplay/cmd/audit_log_extension.go b/pkg/sqlreplay/cmd/audit_log_extension.go new file mode 100644 index 000000000..c86636852 --- /dev/null +++ b/pkg/sqlreplay/cmd/audit_log_extension.go @@ -0,0 +1,315 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/pingcap/tiproxy/lib/util/errors" + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/siddontang/go/hack" + "go.uber.org/zap" +) + +var _ AuditLogDecoder = (*AuditLogExtensionDecoder)(nil) + +type AuditLogExtensionDecoder struct { + connInfo map[uint64]auditLogPluginConnCtx + commandEndTime time.Time + // pendingCmds contains the commands that has not been returned yet. + pendingCmds []*Command + psCloseStrategy PSCloseStrategy + idAllocator *ConnIDAllocator + lg *zap.Logger +} + +func NewAuditLogExtensionDecoder(lg *zap.Logger) AuditLogDecoder { + return &AuditLogExtensionDecoder{ + connInfo: make(map[uint64]auditLogPluginConnCtx), + psCloseStrategy: PSCloseStrategyDirected, + lg: lg, + } +} + +// EnableFilterCommandWithRetry implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) EnableFilterCommandWithRetry() { + // do nothing for extension decoder, it's not supported yet +} + +// SetUserAllowlist implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetUserAllowlist(users []string) { + // do nothing for extension decoder, it's not supported yet +} + +// SetCommandEndTime implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetCommandEndTime(t time.Time) { + decoder.commandEndTime = t +} + +// SetIDAllocator implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetIDAllocator(alloc *ConnIDAllocator) { + decoder.idAllocator = alloc +} + +// SetPSCloseStrategy implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetPSCloseStrategy(s PSCloseStrategy) { + decoder.psCloseStrategy = s +} + +// SetCommandStartTime implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetCommandStartTime(t time.Time) { + // do nothing for extension decoder +} + +func (decoder *AuditLogExtensionDecoder) Decode(reader LineReader) (retCmd *Command, err error) { + defer func() { + if retCmd != nil { + fmt.Println("Decoded command:", retCmd.ConnID, retCmd.Line, retCmd.StartTs, retCmd.EndTs, "error:", err) + } + }() + if len(decoder.pendingCmds) > 0 { + cmd := decoder.pendingCmds[0] + decoder.pendingCmds = decoder.pendingCmds[1:] + return cmd, nil + } + + kvs := make(map[string]string, 25) + for { + line, filename, lineIdx, err := reader.ReadLine() + if err != nil { + return nil, err + } + clear(kvs) + err = parseLog(kvs, hack.String(line)) + if err != nil { + return nil, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error()) + } + connStr := kvs[auditPluginKeyConnID] + if len(connStr) == 0 { + return nil, errors.Errorf("%s, line %d: no connection id in line: %s", filename, lineIdx, line) + } + upstreamConnID, err := strconv.ParseUint(connStr, 10, 64) + if err != nil { + return nil, errors.Errorf("%s, line %d: parsing connection id failed: %s", filename, lineIdx, connStr) + } + + // TODO: add both startTs and endTs in extension log. We only have the endTS is the current format. + endTs, err := time.Parse(timeLayout, kvs[auditPluginKeyLogTime]) + if endTs.Before(decoder.commandEndTime) { + // Ignore the commands before CommandEndTime. + continue + } + + var connID uint64 + if connCtx, ok := decoder.connInfo[upstreamConnID]; ok { + connID = connCtx.connID + } else { + // New connection, allocate a new connection ID. + if decoder.idAllocator == nil { + connID = upstreamConnID + } else { + connID = decoder.idAllocator.alloc() + } + connCtx.connID = connID + decoder.connInfo[upstreamConnID] = connCtx + } + + eventStr := kvs[auditPluginKeyEvent] + if len(eventStr) <= 4 { + return nil, errors.Errorf("%s, line %d: invalid event field: %s", filename, lineIdx, eventStr) + } + // Remove the surrounding quotes and brackets. + eventStr = eventStr[2 : len(eventStr)-2] + events := strings.Split(eventStr, ",") + var cmds []*Command + switch events[0] { + case "CONNECTION": + if len(events) > 1 && events[1] == "DISCONNECT" { + delete(decoder.connInfo, upstreamConnID) + cmds = []*Command{{ + Type: pnet.ComQuit, + Payload: []byte{pnet.ComQuit.Byte()}, + }} + } + case "QUERY": + cmds, err = decoder.parseQueryEvent(kvs, events, upstreamConnID) + } + if err != nil { + return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx) + } + // The log is ignored, skip. + if len(cmds) == 0 { + continue + } + + db := kvs[auditPluginKeyCurDB] + for _, cmd := range cmds { + cmd.Success = true + cmd.UpstreamConnID = upstreamConnID + cmd.ConnID = connID + // We don't have an accurate startTs in extension log. + cmd.StartTs = endTs + cmd.CurDB = db + cmd.FileName = filename + cmd.Line = lineIdx + cmd.EndTs = endTs + cmd.kvs = kvs + } + if len(cmds) > 1 { + decoder.pendingCmds = cmds[1:] + } + return cmds[0], nil + } +} + +func (decoder *AuditLogExtensionDecoder) parseQueryEvent(kvs map[string]string, events []string, connID uint64) ([]*Command, error) { + connInfo := decoder.connInfo[connID] + if connInfo.preparedStmt == nil { + connInfo.preparedStmt = make(map[uint32]struct{}) + connInfo.preparedStmtSql = make(map[string]uint32) + } + + var sql string + sqlStr := kvs[auditPluginKeySQL] + if len(sqlStr) > 0 { + var err error + sql, err = parseSQL(sqlStr) + if err != nil { + return nil, errors.Wrapf(err, "unquote sql failed: %s", sqlStr) + } + } + cmds := make([]*Command, 0, 3) + // Only handle two events: + // - QUERY,EXECUTE + // - QUERY + if events[0] == "QUERY" && len(events) > 1 && events[1] == "EXECUTE" { + params, ok := kvs[auditPluginKeyParams] + if !ok { + return nil, nil + } + args, err := parseExecuteParamsForExtension(params) + if err != nil { + return nil, err + } + + var stmtID uint32 + var shouldPrepare bool + + switch decoder.psCloseStrategy { + case PSCloseStrategyAlways: + connInfo.lastPsID++ + decoder.connInfo[connID] = connInfo + stmtID = connInfo.lastPsID + shouldPrepare = true + case PSCloseStrategyNever: + if id, ok := connInfo.preparedStmtSql[sql]; ok { + shouldPrepare = false + stmtID = id + } else { + connInfo.lastPsID++ + connInfo.preparedStmtSql[sql] = connInfo.lastPsID + decoder.connInfo[connID] = connInfo + stmtID = connInfo.lastPsID + shouldPrepare = true + } + } + + // Append PREPARE command if needed. + if shouldPrepare { + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtPrepare, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, hack.Slice(sql)...), + }) + } + + // Append EXECUTE command + executeReq, err := pnet.MakeExecuteStmtRequest(stmtID, args, true) + if err != nil { + return nil, errors.Wrapf(err, "make execute request failed") + } + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtExecute, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Params: args, + Payload: executeReq, + }) + connInfo.lastCmd = cmds[len(cmds)-1] + + // Append CLOSE command if needed. + if decoder.psCloseStrategy == PSCloseStrategyAlways { + // close the prepared statement right after it's executed. + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtClose, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Payload: pnet.MakeCloseStmtRequest(stmtID), + }) + } + } else if events[0] == "QUERY" { + cmds = append(cmds, &Command{ + Type: pnet.ComQuery, + StmtType: kvs[auditPluginKeyStmtType], + Payload: append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...), + }) + connInfo.lastCmd = cmds[0] + } + + decoder.connInfo[connID] = connInfo + return cmds, nil +} + +// parseExecuteParamsForExtension parses the param in audit log extension field like "[1,abc,NULL,\"test bytes\""]" +// This function has the following known limitations: +// - All params are returned as string type. It cannot distinguish int 1 and string "1". +// - It cannot distinguish single empty string and no param. +func parseExecuteParamsForExtension(value string) ([]any, error) { + v, err := strconv.Unquote(value) + if err != nil { + return nil, errors.Wrapf(err, "unquote execute params failed: %s", value) + } + if v[0] != '[' || v[len(v)-1] != ']' { + return nil, errors.Errorf("no brackets in params: %s", value) + } + v = v[1 : len(v)-1] + if len(v) == 0 { + return nil, nil + } + + params := make([]any, 0, 10) + for idx := 0; idx < len(v); idx++ { + switch v[idx] { + case '"': + endIdx := skipQuotes(v[idx+1:], false) + if endIdx == -1 { + return nil, errors.Errorf("unterminated quote in params: %s", v[idx+1:]) + } + + unquoted, err := strconv.Unquote(v[idx : idx+endIdx+2]) + if err != nil { + return nil, errors.Wrapf(err, "unquote param failed: %s", v[idx:idx+endIdx+2]) + } + params = append(params, unquoted) + idx += endIdx + 1 + case ',', ' ': + default: + endIdx := strings.Index(v[idx:], ",") + if endIdx == -1 { + endIdx = len(v) - idx + } + params = append(params, v[idx:idx+endIdx]) + idx += endIdx - 1 + } + } + + return params, nil +} diff --git a/pkg/sqlreplay/cmd/audit_log_extension_test.go b/pkg/sqlreplay/cmd/audit_log_extension_test.go new file mode 100644 index 000000000..4d748f78a --- /dev/null +++ b/pkg/sqlreplay/cmd/audit_log_extension_test.go @@ -0,0 +1,480 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "testing" + "time" + + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestDecodeMultiLinesForExtension(t *testing.T) { + tests := []struct { + lines string + cmds []*Command + }{ + { + // db is changed in the second sql + lines: `[2026/01/08 19:44:11.099 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-0009] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:11.110 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000a] [EVENT="[QUERY,QUERY_DML,INSERT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test1] [SQL_TEXT="INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')"] [AFFECTED_ROWS=1]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 1, + Success: true, + }, + { + CurDB: "test1", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')")...), + Line: 2, + Success: true, + }, + }, + }, + { + // db stays the same in the second sql + lines: `[2026/01/08 19:44:11.099 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-0009] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:11.110 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000a] [EVENT="[QUERY,QUERY_DML,INSERT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')"] [AFFECTED_ROWS=1]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 1, + Success: true, + }, + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')")...), + Line: 2, + Success: true, + }, + }, + }, + { + // new connection + quit connection + lines: `[2026/01/08 19:44:27.746 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-0002] [EVENT="[CONNECTION,CONNECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[]"] [STATUS_CODE=1] [CURRENT_DB=test] [CONNECTION_TYPE=TCP] [PID=454064] [SERVER_VERSION=v9.0.0-beta.2.pre-1017-gbedae51b62] [SSL_VERSION=] [HOST_IP=127.0.0.1] [HOST_PORT=4000] [CLIENT_IP=127.0.0.1] [CLIENT_PORT=49366] [AUTH_METHOD=mysql_native_password] [CONN_ATTRS="{\"_client_name\":\"Go-MySQL-Driver\",\"_os\":\"linux\",\"_pid\":\"460879\",\"_platform\":\"amd64\",\"_server_host\":\"127.0.0.1\"}"] +[2026/01/08 19:44:27.782 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-0003] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:27.963 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-000e] [EVENT="[CONNECTION,DISCONNECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[]"] [STATUS_CODE=1] [CONNECTION_TYPE=TCP] [PID=454064] [SERVER_VERSION=v9.0.0-beta.2.pre-1017-gbedae51b62] [SSL_VERSION=] [HOST_IP=127.0.0.1] [HOST_PORT=4000] [CLIENT_IP=127.0.0.1] [CLIENT_PORT=49366] [AUTH_METHOD=mysql_native_password] [CONN_ATTRS="{\"_client_name\":\"Go-MySQL-Driver\",\"_os\":\"linux\",\"_pid\":\"460879\",\"_platform\":\"amd64\",\"_server_host\":\"127.0.0.1\"}"]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 27, 782000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 27, 782000000, time.Local), + ConnID: 260047206, + UpstreamConnID: 260047206, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 2, + Success: true, + }, + { + CurDB: "", + StartTs: time.Date(2026, 1, 8, 19, 44, 27, 963000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 27, 963000000, time.Local), + ConnID: 260047206, + UpstreamConnID: 260047206, + Type: pnet.ComQuit, + Payload: []byte{pnet.ComQuit.Byte()}, + Line: 3, + Success: true, + }, + }, + }, + { + // 2 prepared statements + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: []byte{0x17, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{2, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + }, + }, + { + // 2 different connections + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047063] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + }, + }, + } + + for i, test := range tests { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetPSCloseStrategy(PSCloseStrategyAlways) + mr := mockReader{data: append([]byte(test.lines), '\n'), filename: "my/file"} + cmds, err := decodeCmds(decoder, &mr) + if err != nil { + require.ErrorContains(t, err, "EOF", "case %d", i) + } + for _, cmd := range test.cmds { + cmd.FileName = "my/file" + } + require.Equal(t, test.cmds, cmds, "case %d", i) + } +} + +func TestDecodeAuditExtensionInNeverMode(t *testing.T) { + tests := []struct { + lines string + cmds []*Command + }{ + { + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + }, + }, + } + + for i, test := range tests { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetPSCloseStrategy(PSCloseStrategyNever) + mr := mockReader{data: append([]byte(test.lines), '\n')} + cmds, err := decodeCmds(decoder, &mr) + require.Error(t, err, "case %d", i) + require.Equal(t, test.cmds, cmds, "case %d", i) + } +} + +func TestParseExecuteParamsForExtension(t *testing.T) { + tests := []struct { + input string + expected []any + hasError bool + }{ + { + input: `"[]"`, + expected: nil, + hasError: false, + }, + { + input: `"[1]"`, + expected: []any{"1"}, + hasError: false, + }, + { + input: `"[1,2,3]"`, + expected: []any{"1", "2", "3"}, + hasError: false, + }, + { + input: `"[abc]"`, + expected: []any{"abc"}, + hasError: false, + }, + { + input: `"['abc']"`, + expected: []any{"'abc'"}, + hasError: false, + }, + { + input: `"[NULL]"`, + expected: []any{"NULL"}, + hasError: false, + }, + { + input: `"[1,abc,NULL,\"test bytes\"]"`, + expected: []any{"1", "abc", "NULL", "test bytes"}, + hasError: false, + }, + { + input: `"[\"test\\\"escape\"]"`, + expected: []any{"test\"escape"}, + hasError: false, + }, + { + input: `"[\"hello world\"]"`, + expected: []any{"hello world"}, + hasError: false, + }, + { + input: `"[1, 2, 3]"`, + expected: []any{"1", "2", "3"}, + hasError: false, + }, + { + input: `"[\"hello, world\"]"`, + expected: []any{"hello, world"}, + hasError: false, + }, + { + input: `"[1.5,2.75,3.14]"`, + expected: []any{"1.5", "2.75", "3.14"}, + hasError: false, + }, + { + input: `"[true,false]"`, + expected: []any{"true", "false"}, + hasError: false, + }, + { + input: `"[1,2,3"`, + expected: nil, + hasError: true, + }, + { + input: `"1,2,3]"`, + expected: nil, + hasError: true, + }, + { + input: `"[\"abc ]"`, + expected: nil, + hasError: true, + }, + { + input: `[1,2,3]`, + expected: nil, + hasError: true, + }, + } + + for _, tt := range tests { + result, err := parseExecuteParamsForExtension(tt.input) + if tt.hasError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, result) + } + } +} diff --git a/pkg/sqlreplay/cmd/audit_log_plugin.go b/pkg/sqlreplay/cmd/audit_log_plugin.go index a3ec8e7ac..92a122818 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin.go @@ -32,6 +32,11 @@ const ( auditPluginKeyRetry = "RETRY" auditPluginKeyUser = "USER" + // auditPluginKeyLogTime is a special marker for the first field in audit log line, which is usually the + // log time. It doesn't have key in the original format, but replayer will give it a special key. + // As it's not a real key, it'll start with underscore to avoid conflict with real keys. + auditPluginKeyLogTime = "_LOG_TIME" + auditPluginClassGeneral = "GENERAL" auditPluginClassTableAccess = "TABLE_ACCESS" auditPluginClassConnect = "CONNECTION" @@ -73,7 +78,7 @@ type auditLogPluginConnCtx struct { lastCmd *Command } -func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) *AuditLogPluginDecoder { +func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) AuditLogDecoder { return &AuditLogPluginDecoder{ connInfo: make(map[uint64]auditLogPluginConnCtx), psCloseStrategy: PSCloseStrategyDirected, @@ -83,6 +88,7 @@ func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) *AuditLogPluginDecod } var _ CmdDecoder = (*AuditLogPluginDecoder)(nil) +var _ AuditLogDecoder = (*AuditLogPluginDecoder)(nil) // PSCloseStrategy defines when to close the prepared statements. type PSCloseStrategy string @@ -98,6 +104,16 @@ const ( PSCloseStrategyDirected PSCloseStrategy = "directed" ) +type AuditLogDecoder interface { + CmdDecoder + + SetPSCloseStrategy(s PSCloseStrategy) + SetIDAllocator(alloc *ConnIDAllocator) + SetCommandEndTime(t time.Time) + EnableFilterCommandWithRetry() + SetUserAllowlist(users []string) +} + type AuditLogPluginDecoder struct { connInfo map[uint64]auditLogPluginConnCtx commandStartTime time.Time @@ -254,6 +270,7 @@ func (decoder *AuditLogPluginDecoder) SetIDAllocator(alloc *ConnIDAllocator) { // All SQL_TEXT are converted into one line in audit log. func parseLog(kv map[string]string, line string) error { + firstField := true for idx := 0; idx < len(line); idx++ { switch line[idx] { case '[': @@ -262,7 +279,11 @@ func parseLog(kv map[string]string, line string) error { return err } idx += endIdx + 1 - if len(key) > 0 { + + if firstField { + kv[auditPluginKeyLogTime] = value + firstField = false + } else if len(key) > 0 { kv[key] = value } } diff --git a/pkg/sqlreplay/cmd/audit_log_plugin_test.go b/pkg/sqlreplay/cmd/audit_log_plugin_test.go index 9e71fad60..38bd04e80 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin_test.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin_test.go @@ -193,6 +193,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc]", + kvs: map[string]string{auditPluginKeyLogTime: "abc"}, hasErr: false, }, { @@ -217,7 +218,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc=def]", - kvs: map[string]string{"abc": "def"}, + kvs: map[string]string{auditPluginKeyLogTime: "def"}, hasErr: false, }, { @@ -226,12 +227,12 @@ func TestParseLog(t *testing.T) { }, { line: "[abc=def=ghi]", - kvs: map[string]string{"abc": "def=ghi"}, + kvs: map[string]string{auditPluginKeyLogTime: "def=ghi"}, hasErr: false, }, { line: "[a=\"b\"]", - kvs: map[string]string{"a": "\"b\""}, + kvs: map[string]string{auditPluginKeyLogTime: "\"b\""}, hasErr: false, }, { @@ -240,7 +241,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc][a=b]", - kvs: map[string]string{"a": "b"}, + kvs: map[string]string{auditPluginKeyLogTime: "abc", "a": "b"}, hasErr: false, }, { @@ -249,16 +250,17 @@ func TestParseLog(t *testing.T) { }, { line: "a[abc]a", + kvs: map[string]string{auditPluginKeyLogTime: "abc"}, hasErr: false, }, { line: "a[a=b]a", - kvs: map[string]string{"a": "b"}, + kvs: map[string]string{auditPluginKeyLogTime: "b"}, hasErr: false, }, { line: "a[a=b]a[c=d]", - kvs: map[string]string{"a": "b", "c": "d"}, + kvs: map[string]string{auditPluginKeyLogTime: "b", "c": "d"}, hasErr: false, }, { @@ -267,17 +269,17 @@ func TestParseLog(t *testing.T) { }, { line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.720 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "1336.083", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"[=]\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 17:03:53.720 +08:00", "ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.720 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "1336.083", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"[=]\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, { line: `[2025/09/06 17:03:53.717 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.717 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=824806376.375] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"\n\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.717 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "824806376.375", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 17:03:53.717 +08:00", "ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.717 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "824806376.375", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, { line: `[2025/09/06 16:50:08.917 +08:00] [INFO] [logger.go:77] [ID=17571486080] [TIMESTAMP=2025/09/06 16:50:08.917 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=2442.333] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"\n\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571486080", "TIMESTAMP": "2025/09/06 16:50:08.917 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "2442.333", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 16:50:08.917 +08:00", "ID": "17571486080", "TIMESTAMP": "2025/09/06 16:50:08.917 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "2442.333", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, } diff --git a/pkg/sqlreplay/cmd/cmd.go b/pkg/sqlreplay/cmd/cmd.go index ea25ec6a4..4e0669cb7 100644 --- a/pkg/sqlreplay/cmd/cmd.go +++ b/pkg/sqlreplay/cmd/cmd.go @@ -15,11 +15,23 @@ import ( "go.uber.org/zap" ) +// TrafficFormat is the supported format of traffic files. +type TrafficFormat string + const ( - FormatNative = "native" - FormatAuditLogPlugin = "audit_log_plugin" + FormatNative TrafficFormat = "native" + FormatAuditLogPlugin TrafficFormat = "audit_log_plugin" + FormatAuditLogExtension TrafficFormat = "audit_log_extension" ) +func (f TrafficFormat) String() string { + return string(f) +} + +func (f TrafficFormat) IsAuditLogFormat() bool { + return f == FormatAuditLogPlugin || f == FormatAuditLogExtension +} + type LineReader interface { String() string ReadLine() ([]byte, string, int, error) @@ -27,7 +39,7 @@ type LineReader interface { Close() } -func NewCmdEncoder(_ string) CmdEncoder { +func NewCmdEncoder(_ TrafficFormat) CmdEncoder { // Only support writing native format return NewNativeEncoder() } @@ -36,10 +48,12 @@ type CmdEncoder interface { Encode(c *Command, writer *bytes.Buffer) error } -func NewCmdDecoder(format string, dedup *DeDup, lg *zap.Logger) CmdDecoder { +func NewCmdDecoder(format TrafficFormat, dedup *DeDup, lg *zap.Logger) CmdDecoder { switch format { case FormatAuditLogPlugin: return NewAuditLogPluginDecoder(dedup, lg) + case FormatAuditLogExtension: + return NewAuditLogExtensionDecoder(lg) default: return NewNativeDecoder() } diff --git a/pkg/sqlreplay/manager/job.go b/pkg/sqlreplay/manager/job.go index 16d849720..6431b15aa 100644 --- a/pkg/sqlreplay/manager/job.go +++ b/pkg/sqlreplay/manager/job.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tiproxy/pkg/sqlreplay/capture" + "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" "github.com/pingcap/tiproxy/pkg/sqlreplay/replay" "github.com/siddontang/go/hack" ) @@ -134,13 +135,13 @@ type replayJob struct { type replayJob4Marshal struct { job4Marshal - LastCmdTs string `json:"last_cmd_ts,omitempty"` - Input string `json:"input,omitempty"` - Username string `json:"username,omitempty"` - Format string `json:"format,omitempty"` - Speed float64 `json:"speed,omitempty"` - ReadOnly bool `json:"readonly,omitempty"` - Addr string `json:"addr,omitempty"` + LastCmdTs string `json:"last_cmd_ts,omitempty"` + Input string `json:"input,omitempty"` + Username string `json:"username,omitempty"` + Format cmd.TrafficFormat `json:"format,omitempty"` + Speed float64 `json:"speed,omitempty"` + ReadOnly bool `json:"readonly,omitempty"` + Addr string `json:"addr,omitempty"` } func (job *replayJob) Type() JobType { diff --git a/pkg/sqlreplay/replay/mock_test.go b/pkg/sqlreplay/replay/mock_test.go index 096796ec3..fd6f8babd 100644 --- a/pkg/sqlreplay/replay/mock_test.go +++ b/pkg/sqlreplay/replay/mock_test.go @@ -163,7 +163,7 @@ func newMockNormalLoader() *mockNormalLoader { return &mockNormalLoader{} } -func (m *mockNormalLoader) writeCommand(command *cmd.Command, format string) { +func (m *mockNormalLoader) writeCommand(command *cmd.Command, format cmd.TrafficFormat) { encoder := cmd.NewCmdEncoder(format) _ = encoder.Encode(command, &m.buf) } diff --git a/pkg/sqlreplay/replay/replay.go b/pkg/sqlreplay/replay/replay.go index 45ec91548..1ba52aad2 100644 --- a/pkg/sqlreplay/replay/replay.go +++ b/pkg/sqlreplay/replay/replay.go @@ -83,7 +83,7 @@ type Replay interface { } type ReplayConfig struct { - Format string + Format cmd.TrafficFormat Input string Username string Password string `json:"-"` @@ -145,7 +145,7 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { return nil, errors.New("input is required") } inputs := strings.Split(cfg.Input, ",") - if len(inputs) > 1 && cfg.Format != cmd.FormatAuditLogPlugin { + if len(inputs) > 1 && !cfg.Format.IsAuditLogFormat() { return nil, errors.New("only `audit_log_plugin` format supports multiple input files") } var storages []storage.ExternalStorage @@ -176,7 +176,7 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { return storages, errors.Errorf("speed should be between %f and %f", minSpeed, maxSpeed) } switch cfg.Format { - case cmd.FormatAuditLogPlugin, cmd.FormatNative, "": + case cmd.FormatAuditLogPlugin, cmd.FormatAuditLogExtension, cmd.FormatNative, "": default: return storages, errors.Errorf("invalid traffic file format %s", cfg.Format) } @@ -207,10 +207,22 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { default: return storages, errors.Errorf("invalid prepared statement close strategy %s", cfg.PSCloseStrategy) } - if cfg.Format != cmd.FormatAuditLogPlugin && !cfg.CommandEndTime.IsZero() { + if !cfg.Format.IsAuditLogFormat() && !cfg.CommandEndTime.IsZero() { return storages, errors.New("command end time is only supported for `audit_log_plugin` format") } + if cfg.Format == cmd.FormatAuditLogExtension { + if cfg.PSCloseStrategy == cmd.PSCloseStrategyDirected { + return storages, errors.New("prepared statement directed close strategy is not supported for audit log plugin v2 format") + } + if cfg.FilterCommandWithRetry { + return storages, errors.New("filtering commands with retry is not supported for audit log plugin v2 format") + } + if !cfg.CommandStartTime.IsZero() { + return storages, errors.New("command start time filter is not supported for audit log plugin v2 format") + } + } + if cfg.DynamicInput { if len(storages) != 1 { return storages, errors.New("dynamic input cannot be enabled with more than one input") @@ -258,7 +270,7 @@ func (cfg *ReplayConfig) LoadFromCheckpoint() error { cfg.CommandStartTime = time.Unix(0, state.CurCmdTs) } // Only load `CommandEndTime` for `audit_log_plugin` format, or it'll not pass validation. - if state.CurCmdEndTs > 0 && cfg.Format == cmd.FormatAuditLogPlugin { + if state.CurCmdEndTs > 0 && cfg.Format.IsAuditLogFormat() { cfg.CommandEndTime = time.Unix(0, state.CurCmdEndTs) } return nil @@ -616,7 +628,7 @@ func (r *replay) constructDecoderForReader(ctx context.Context, reader cmd.LineR // impossible for decoder to know whether `use xxx` will be executed, and thus cannot maintain // the current session state correctly. cmdDecoder.SetCommandStartTime(r.cfg.CommandStartTime) - if auditLogDecoder, ok := cmdDecoder.(*cmd.AuditLogPluginDecoder); ok { + if auditLogDecoder, ok := cmdDecoder.(cmd.AuditLogDecoder); ok { auditLogDecoder.SetPSCloseStrategy(r.cfg.PSCloseStrategy) auditLogDecoder.SetIDAllocator(idAllocator) auditLogDecoder.SetCommandEndTime(r.cfg.CommandEndTime) diff --git a/pkg/sqlreplay/store/line.go b/pkg/sqlreplay/store/line.go index 84b3f6772..13a4e62da 100644 --- a/pkg/sqlreplay/store/line.go +++ b/pkg/sqlreplay/store/line.go @@ -30,7 +30,7 @@ func NewWriter(lg *zap.Logger, externalStorage storage.ExternalStorage, cfg Writ type ReaderCfg struct { Dir string - Format string + Format cmd.TrafficFormat EncryptionMethod string EncryptionKey []byte // Reader will skip the files whose end time is before FileNameFilterTime. diff --git a/pkg/sqlreplay/store/rotate.go b/pkg/sqlreplay/store/rotate.go index b1b5f052d..b33f0beab 100644 --- a/pkg/sqlreplay/store/rotate.go +++ b/pkg/sqlreplay/store/rotate.go @@ -281,7 +281,7 @@ func (r *rotateReader) openFileLoop(ctx context.Context) error { } fileTime := parseFileTime(name, fileNamePrefix) if fileTime.IsZero() { - r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", r.cfg.Format)) + r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", string(r.cfg.Format))) return false, nil } if !fileTime.After(curFileTime) { @@ -451,9 +451,8 @@ func (r *rotateReader) walkS3(ctx context.Context, curFileName string, s3api s3i return nil } -func getFileNamePrefix(format string) string { - switch format { - case cmd.FormatAuditLogPlugin: +func getFileNamePrefix(format cmd.TrafficFormat) string { + if format.IsAuditLogFormat() { return auditFileNamePrefix } return fileNamePrefix diff --git a/pkg/sqlreplay/store/rotate_test.go b/pkg/sqlreplay/store/rotate_test.go index cb7a0334a..aed509e8a 100644 --- a/pkg/sqlreplay/store/rotate_test.go +++ b/pkg/sqlreplay/store/rotate_test.go @@ -145,7 +145,7 @@ func mustParseTime(s string) time.Time { func TestIterateFiles(t *testing.T) { tests := []struct { - format string + format cmd.TrafficFormat fileNames []string order []string }{ From 4ea5cf44c0ee71f470d3f2675c47031becde4c8e Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Wed, 27 May 2026 22:14:43 +0800 Subject: [PATCH 2/2] sqlreplay: harden audit log extension decoder --- pkg/sqlreplay/cmd/audit_log_extension.go | 38 ++++++-- pkg/sqlreplay/cmd/audit_log_extension_test.go | 92 +++++++++++++++++-- 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/pkg/sqlreplay/cmd/audit_log_extension.go b/pkg/sqlreplay/cmd/audit_log_extension.go index c86636852..5d075bbf0 100644 --- a/pkg/sqlreplay/cmd/audit_log_extension.go +++ b/pkg/sqlreplay/cmd/audit_log_extension.go @@ -4,7 +4,6 @@ package cmd import ( - "fmt" "strconv" "strings" "time" @@ -23,6 +22,7 @@ type AuditLogExtensionDecoder struct { // pendingCmds contains the commands that has not been returned yet. pendingCmds []*Command psCloseStrategy PSCloseStrategy + userAllowlist map[string]struct{} idAllocator *ConnIDAllocator lg *zap.Logger } @@ -42,7 +42,23 @@ func (decoder *AuditLogExtensionDecoder) EnableFilterCommandWithRetry() { // SetUserAllowlist implements [AuditLogDecoder]. func (decoder *AuditLogExtensionDecoder) SetUserAllowlist(users []string) { - // do nothing for extension decoder, it's not supported yet + if len(users) == 0 { + decoder.userAllowlist = nil + return + } + m := make(map[string]struct{}) + for _, u := range users { + u = strings.ToLower(strings.TrimSpace(u)) + if u == "" { + continue + } + m[u] = struct{}{} + } + if len(m) == 0 { + decoder.userAllowlist = nil + } else { + decoder.userAllowlist = m + } } // SetCommandEndTime implements [AuditLogDecoder]. @@ -66,11 +82,6 @@ func (decoder *AuditLogExtensionDecoder) SetCommandStartTime(t time.Time) { } func (decoder *AuditLogExtensionDecoder) Decode(reader LineReader) (retCmd *Command, err error) { - defer func() { - if retCmd != nil { - fmt.Println("Decoded command:", retCmd.ConnID, retCmd.Line, retCmd.StartTs, retCmd.EndTs, "error:", err) - } - }() if len(decoder.pendingCmds) > 0 { cmd := decoder.pendingCmds[0] decoder.pendingCmds = decoder.pendingCmds[1:] @@ -99,10 +110,19 @@ func (decoder *AuditLogExtensionDecoder) Decode(reader LineReader) (retCmd *Comm // TODO: add both startTs and endTs in extension log. We only have the endTS is the current format. endTs, err := time.Parse(timeLayout, kvs[auditPluginKeyLogTime]) + if err != nil { + return nil, errors.Errorf("%s, line %d: parsing timestamp failed: %s", filename, lineIdx, kvs[auditPluginKeyLogTime]) + } if endTs.Before(decoder.commandEndTime) { // Ignore the commands before CommandEndTime. continue } + if decoder.userAllowlist != nil { + user := strings.ToLower(strings.TrimSpace(kvs[auditPluginKeyUser])) + if _, ok := decoder.userAllowlist[user]; !ok { + continue + } + } var connID uint64 if connCtx, ok := decoder.connInfo[upstreamConnID]; ok { @@ -189,6 +209,7 @@ func (decoder *AuditLogExtensionDecoder) parseQueryEvent(kvs map[string]string, if events[0] == "QUERY" && len(events) > 1 && events[1] == "EXECUTE" { params, ok := kvs[auditPluginKeyParams] if !ok { + // Redacted extension logs omit EXECUTE_PARAMS, and the original prepared values can't be recovered. return nil, nil } args, err := parseExecuteParamsForExtension(params) @@ -277,6 +298,9 @@ func parseExecuteParamsForExtension(value string) ([]any, error) { if err != nil { return nil, errors.Wrapf(err, "unquote execute params failed: %s", value) } + if len(v) == 0 { + return nil, errors.Errorf("no brackets in params: %s", value) + } if v[0] != '[' || v[len(v)-1] != ']' { return nil, errors.Errorf("no brackets in params: %s", value) } diff --git a/pkg/sqlreplay/cmd/audit_log_extension_test.go b/pkg/sqlreplay/cmd/audit_log_extension_test.go index 4d748f78a..3fa9f5b34 100644 --- a/pkg/sqlreplay/cmd/audit_log_extension_test.go +++ b/pkg/sqlreplay/cmd/audit_log_extension_test.go @@ -4,6 +4,8 @@ package cmd import ( + "io" + "os" "testing" "time" @@ -375,6 +377,62 @@ func TestDecodeAuditExtensionInNeverMode(t *testing.T) { } } +func TestDecodeAuditExtensionUserAllowlist(t *testing.T) { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetUserAllowlist([]string{" ROOT "}) + lines := auditExtensionLine("root", "1", "QUERY,QUERY_DML,SELECT", `"SELECT 1"`, "") + "\n" + + auditExtensionLine("app", "2", "QUERY,QUERY_DML,SELECT", `"SELECT 2"`, "") + "\n" + mr := mockReader{data: []byte(lines)} + + cmds, err := decodeCmds(decoder, &mr) + require.ErrorIs(t, err, io.EOF) + require.Len(t, cmds, 1) + require.Equal(t, append([]byte{pnet.ComQuery.Byte()}, []byte("SELECT 1")...), cmds[0].Payload) +} + +func TestDecodeAuditExtensionDoesNotPrintToStdout(t *testing.T) { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + mr := mockReader{data: []byte(auditExtensionLine("root", "1", "QUERY,QUERY_DML,SELECT", `"SELECT 1"`, "") + "\n")} + + stdout := os.Stdout + r, w, err := os.Pipe() + require.NoError(t, err) + os.Stdout = w + cmd, decodeErr := decoder.Decode(&mr) + require.NoError(t, w.Close()) + os.Stdout = stdout + t.Cleanup(func() { + os.Stdout = stdout + r.Close() + }) + out, err := io.ReadAll(r) + require.NoError(t, err) + + require.NoError(t, decodeErr) + require.NotNil(t, cmd) + require.Empty(t, string(out)) +} + +func TestDecodeAuditExtensionInvalidTimestamp(t *testing.T) { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + line := `[not-a-time] [INFO] [EVENT="[QUERY,QUERY_DML,SELECT]"] [USER=root] [CONNECTION_ID=1] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT 1"]` + mr := mockReader{data: []byte(line + "\n")} + + cmd, err := decoder.Decode(&mr) + require.Nil(t, cmd) + require.ErrorContains(t, err, "parsing timestamp failed: not-a-time") +} + +func TestDecodeAuditExtensionExecuteWithoutParams(t *testing.T) { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetPSCloseStrategy(PSCloseStrategyAlways) + mr := mockReader{data: []byte(auditExtensionLine("root", "1", "QUERY,EXECUTE,SELECT", `"SELECT ?"`, "") + "\n")} + + cmds, err := decodeCmds(decoder, &mr) + require.ErrorIs(t, err, io.EOF) + require.Empty(t, cmds) +} + func TestParseExecuteParamsForExtension(t *testing.T) { tests := []struct { input string @@ -436,6 +494,11 @@ func TestParseExecuteParamsForExtension(t *testing.T) { expected: []any{"hello, world"}, hasError: false, }, + { + input: `"[\"\"]"`, + expected: []any{""}, + hasError: false, + }, { input: `"[1.5,2.75,3.14]"`, expected: []any{"1.5", "2.75", "3.14"}, @@ -466,15 +529,30 @@ func TestParseExecuteParamsForExtension(t *testing.T) { expected: nil, hasError: true, }, + { + input: `""`, + expected: nil, + hasError: true, + }, } for _, tt := range tests { - result, err := parseExecuteParamsForExtension(tt.input) - if tt.hasError { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, tt.expected, result) - } + require.NotPanics(t, func() { + result, err := parseExecuteParamsForExtension(tt.input) + if tt.hasError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, result) + } + }) + } +} + +func auditExtensionLine(user, connID, event, sql, params string) string { + line := `[2026/01/08 19:44:11.099 +08:00] [INFO] [EVENT="[` + event + `]"] [USER=` + user + `] [ROLES="[]"] [CONNECTION_ID=` + connID + `] [SESSION_ALIAS=] [TABLES="[]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT=` + sql + `]` + if params != "" { + line += ` [EXECUTE_PARAMS=` + params + `]` } + return line }