From c24b30c4f7fcdb1a3e23d1c812078799d800a8cb Mon Sep 17 00:00:00 2001 From: xuleiming Date: Wed, 11 Feb 2026 21:20:22 +0800 Subject: [PATCH 1/2] some fixes in AI related functions. Signed-off-by: xuleiming --- bfe_basic/condition/parser/scanner.go | 2 +- bfe_basic/condition/primitive.go | 31 ++++++++++++---- bfe_http/common.go | 4 +++ bfe_modules/mod_ai_token_auth/token.go | 4 +-- .../mod_ai_token_auth/token_rule_table.go | 2 +- bfe_modules/mod_body_process/body_process.go | 33 +++++++++++------ .../body_process_rule_load.go | 27 ++++++++++++-- bfe_server/reverseproxy.go | 35 ++++++++++++++----- 8 files changed, 105 insertions(+), 33 deletions(-) diff --git a/bfe_basic/condition/parser/scanner.go b/bfe_basic/condition/parser/scanner.go index 9ea2d8b77..8c14244fd 100644 --- a/bfe_basic/condition/parser/scanner.go +++ b/bfe_basic/condition/parser/scanner.go @@ -181,7 +181,7 @@ func (s *Scanner) scanEscape(quote rune) bool { var n int var base, max uint32 switch s.ch { - case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', quote: + case 'a', 'b', 'f', 'n', 'r', 't', 'v', '.', '\\', quote: s.next() return true case '0', '1', '2', '3', '4', '5', '6', '7': diff --git a/bfe_basic/condition/primitive.go b/bfe_basic/condition/primitive.go index cc61c3e71..40e872cf2 100644 --- a/bfe_basic/condition/primitive.go +++ b/bfe_basic/condition/primitive.go @@ -29,6 +29,7 @@ import ( "github.com/bfenetworks/bfe/bfe_basic" "github.com/bfenetworks/bfe/bfe_basic/condition/parser" + "github.com/bfenetworks/bfe/bfe_http" "github.com/bfenetworks/bfe/bfe_util" "github.com/bfenetworks/bfe/bfe_util/net_util" "github.com/spaolacci/murmur3" @@ -1095,10 +1096,10 @@ type ReqBodyJsonFetcher struct{ } func (pf *ReqBodyJsonFetcher) Fetch(req *bfe_basic.Request) (interface{}, error) { - return ReqBodyJsonFetch(req, pf.path) + return ReqBodyJsonFetch(req, pf.path, nil) } -func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) { +func ReqBodyJsonFetch(req *bfe_basic.Request, path string, httpreq *bfe_http.Request) (string, error) { const jsonCachePrefix = "jsoncache." if req == nil || req.HttpRequest == nil { @@ -1114,7 +1115,26 @@ func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) { } } - bodyAccessor, err := req.HttpRequest.GetBodyAccessor() + if httpreq == nil { + httpreq = req.HttpRequest + } + + str, err := HttpReqBodyJsonGet(httpreq, path) + + if err != nil { + return "", err + } + + req.SetContext(cachepath, str) + return str, nil +} + +func HttpReqBodyJsonGet(req *bfe_http.Request, path string) (string, error) { + if req == nil { + return "", fmt.Errorf("fetcher: nil pointer") + } + + bodyAccessor, err := req.GetBodyAccessor() if bodyAccessor == nil { return "", err } @@ -1122,13 +1142,10 @@ func ReqBodyJsonFetch(req *bfe_basic.Request, path string) (string, error) { body, _ := bodyAccessor.GetBytes() val := gjson.GetBytes(body, path) if !val.Exists() { - req.SetContext(cachepath, "") return "", nil } - str := val.String() - req.SetContext(cachepath, str) - return str, nil + return val.String(), nil } func ReqBodyJsonSet(req *bfe_basic.Request, path string, value string) error { diff --git a/bfe_http/common.go b/bfe_http/common.go index 4ce7f8909..b5632b143 100644 --- a/bfe_http/common.go +++ b/bfe_http/common.go @@ -319,6 +319,10 @@ type WriteRequestError struct { Err error } +func (e WriteRequestError) Unwrap() error { + return e.Err +} + func (e WriteRequestError) Error() string { return fmt.Sprintf("WriteRequestError: %s", e.Err.Error()) } diff --git a/bfe_modules/mod_ai_token_auth/token.go b/bfe_modules/mod_ai_token_auth/token.go index 2e7cbeb60..d8d981088 100644 --- a/bfe_modules/mod_ai_token_auth/token.go +++ b/bfe_modules/mod_ai_token_auth/token.go @@ -78,7 +78,7 @@ func tokenCheck(conf *TokenFile) error { if conf.UnlimitedQuota && conf.RemainQuota != 0 { return errors.New("if UnlimitedQuota is true, RemainQuota must be 0") } - if conf.Models != nil { + if conf.Models != nil && *conf.Models != "" { conf.models = strings.Split(*conf.Models, ",") for i := 0; i < len(conf.models); i++ { conf.models[i] = strings.TrimSpace(conf.models[i]) @@ -87,7 +87,7 @@ func tokenCheck(conf *TokenFile) error { } } } - if conf.Subnet != nil { + if conf.Subnet != nil && *conf.Subnet != "" { res := strings.Split(*conf.Subnet, ",") conf.subnet = make([]*net.IPNet, len(res)) for i := 0; i < len(res); i++ { diff --git a/bfe_modules/mod_ai_token_auth/token_rule_table.go b/bfe_modules/mod_ai_token_auth/token_rule_table.go index c6c75d4e4..fb80825ce 100644 --- a/bfe_modules/mod_ai_token_auth/token_rule_table.go +++ b/bfe_modules/mod_ai_token_auth/token_rule_table.go @@ -181,7 +181,7 @@ func (m *ModuleAITokenAuth) ValidateUserTokenByReq(req *bfe_basic.Request) (toke } if len(token.Models) > 0 { - model, err := condition.ReqBodyJsonFetch(req, "model") + model, err := condition.ReqBodyJsonFetch(req, "model", nil) if err != nil || model == "" { return nil, fmt.Errorf("model not found in request body: %v", err) } diff --git a/bfe_modules/mod_body_process/body_process.go b/bfe_modules/mod_body_process/body_process.go index f8bdd76fc..ee76a7f9c 100644 --- a/bfe_modules/mod_body_process/body_process.go +++ b/bfe_modules/mod_body_process/body_process.go @@ -75,6 +75,18 @@ func (e *RejectionError) Error() string { return e.Message } +type BPError struct { + Err error +} + +func (e *BPError) Unwrap() error { + return e.Err +} + +func (e *BPError) Error() string { + return fmt.Sprintf("BodyProcessError: %s", e.Err.Error()) +} + type Event interface { // GetType() string // GetData() []byte @@ -107,7 +119,7 @@ func (bp *BodyProcessor) CreateEventDecoder(fac EventDecoderFac) { // defer bp.mu.Unlock() dec, err := fac(bp.source) if err != nil { - bp.err = fmt.Errorf("create event decoder: %w", err) + bp.err = &BPError{fmt.Errorf("create event decoder: %w", err)} return } bp.decoder = dec @@ -118,7 +130,7 @@ func (bp *BodyProcessor) CreateEventEncoder(fac EventEncoderFac) { // defer bp.mu.Unlock() enc, err := fac(bp.buffer) if err != nil { - bp.err = fmt.Errorf("create event encoder: %w", err) + bp.err = &BPError{fmt.Errorf("create event encoder: %w", err)} return } bp.encoder = enc @@ -166,8 +178,8 @@ func (bp *BodyProcessor) fillBuffer() error { for { events, decodeErr := bp.decoder.Decode() if decodeErr != nil { - bp.err = decodeErr - return decodeErr + bp.err = &BPError{decodeErr} + return bp.err } if len(events) == 0 { bp.err = io.EOF @@ -182,20 +194,20 @@ func (bp *BodyProcessor) fillBuffer() error { var processErr error events, processErr = processor.Process(events) if processErr != nil { - bp.err = processErr + bp.err = &BPError{processErr} // 检查是否为中断错误 if cvErr, ok := processErr.(*RejectionError); ok { bp.handleRejection(cvErr) return cvErr } - return processErr + return bp.err } } // 编码事件 n, encodeErr := bp.encoder.Encode(events) if encodeErr != nil { - bp.err = encodeErr - return encodeErr + bp.err = &BPError{encodeErr} + return bp.err } if n > 0 { break // 至少有一个事件被处理 @@ -226,7 +238,6 @@ func (bp *BodyProcessor) Close() error { // bp.mu.Lock() // defer bp.mu.Unlock() - bp.buffer.Reset() return bp.source.Close() } @@ -507,7 +518,7 @@ func (dec *LineDecoder) Decode() ([]Event, error) { if err == io.EOF { return []Event{}, nil // 没有更多数据 } - return nil, fmt.Errorf("line decode error: %w", err) + return nil, err } type JsonDecoder struct { @@ -526,7 +537,7 @@ func (dec *JsonDecoder) Decode() ([]Event, error) { if err == io.EOF { return []Event{}, nil // 没有更多数据 } - return nil, fmt.Errorf("json decode error: %w", err) + return nil, err } re := RawEvent(event) return []Event{&re}, nil diff --git a/bfe_modules/mod_body_process/body_process_rule_load.go b/bfe_modules/mod_body_process/body_process_rule_load.go index 4c3f9c574..ff2d38a5c 100644 --- a/bfe_modules/mod_body_process/body_process_rule_load.go +++ b/bfe_modules/mod_body_process/body_process_rule_load.go @@ -36,6 +36,27 @@ type BodyProcessConfig struct { } func BodyProcessConfigCheck(config *BodyProcessConfig) error { + if config == nil { + return nil + } + + if config.Dec != "" && config.Dec != "line" && config.Dec != "json" && config.Dec != "sse" { + return errors.New("invalid Dec") + } + + if config.Enc != "" && config.Enc != "line" && config.Enc != "json" && config.Enc != "sse" { + return errors.New("invalid Enc") + } + + for _, proc := range config.Proc { + if proc.Name != "textfilter" { + return errors.New("invalid Proc name") + } + if len(proc.Params) != 1 { + return errors.New("invalid Proc params, textfilter need only 1 param") + } + } + return nil } @@ -72,9 +93,9 @@ func processRuleCheck(conf processRuleFile) error { return errors.New("no Cond") } - if conf.RequestProcess == nil && conf.ResponseProcess == nil { - return errors.New("no RequestProcess or ResponseProcess") - } + // if conf.RequestProcess == nil && conf.ResponseProcess == nil { + // return errors.New("no RequestProcess or ResponseProcess") + // } if err := BodyProcessConfigCheck(conf.RequestProcess); err != nil { return err diff --git a/bfe_server/reverseproxy.go b/bfe_server/reverseproxy.go index a1c6a7f1f..4910b4b3e 100644 --- a/bfe_server/reverseproxy.go +++ b/bfe_server/reverseproxy.go @@ -22,7 +22,7 @@ package bfe_server import ( "crypto/tls" - "fmt" + "errors" "io" "net" "reflect" @@ -46,6 +46,7 @@ import ( "github.com/bfenetworks/bfe/bfe_http2" "github.com/bfenetworks/bfe/bfe_module" "github.com/bfenetworks/bfe/bfe_modules/mod_ai_token_auth" + "github.com/bfenetworks/bfe/bfe_modules/mod_body_process" "github.com/bfenetworks/bfe/bfe_route" "github.com/bfenetworks/bfe/bfe_route/bfe_cluster" "github.com/bfenetworks/bfe/bfe_spdy" @@ -440,6 +441,17 @@ func (p *ReverseProxy) clusterInvoke(srv *BfeServer, cluster *bfe_cluster.BfeClu backend.OnFailByCluster(cluster) case bfe_http.WriteRequestError, bfe_fcgi.WriteRequestError: + var be *mod_body_process.BPError + if errors.As(err, &be) { + // body process error, no retry + request.ErrCode = bfe_basic.ErrBkBodyProcess + request.ErrMsg = err.Error() + p.proxyState.ErrBkBodyProcess.Inc(1) + allowRetry = false + action = closeAfterReply + break + } + request.ErrCode = bfe_basic.ErrBkWriteRequest request.ErrMsg = err.Error() p.proxyState.ErrBkWriteRequest.Inc(1) @@ -634,8 +646,8 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic var outreq *bfe_http.Request var serverConf *bfe_route.ServerDataConf var writeTimer *time.Timer - var bf BufferFiller - var ok bool + // var bf BufferFiller + // var ok bool req := basicReq.HttpRequest isRedirect := false @@ -665,7 +677,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic // close the connection after response action = closeAfterReply basicReq.BfeStatusCode = bfe_http.StatusInternalServerError - return + goto send_response case bfe_module.BfeHandlerRedirect: // make redirect Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header) @@ -705,7 +717,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic // close the connection after response action = closeAfterReply basicReq.BfeStatusCode = bfe_http.StatusInternalServerError - return + goto send_response case bfe_module.BfeHandlerRedirect: // make redirect Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header) @@ -808,7 +820,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic mod_ai_token_auth.SetApiKey(outreq, *cluster.AIConf.Key) } if cluster.AIConf.ModelMapping != nil { - model, err := condition.ReqBodyJsonFetch(basicReq, "model") + model, err := condition.ReqBodyJsonFetch(basicReq, "model", outreq) if err == nil && model != "" { newModel, ok := (*cluster.AIConf.ModelMapping)[model] if ok { @@ -816,11 +828,18 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic if err != nil { log.Logger.Warn("Failed to set model in request body: %s", err) // just continue, not return error + } else { + // outreq body already changed, need reset Content-Length + if outreq.ContentLength >= 0 { + outreq.ContentLength = -1 + outreq.Header.Del("Content-Length") + } } } } } } +/* // do body process before forwarding bf, ok = outreq.Body.(BufferFiller) if ok { @@ -841,7 +860,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic goto send_response } } - +*/ // invoke cluster to get response res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw) basicReq.HttpResponse = res @@ -895,7 +914,7 @@ response_got: // close the connection after response action = closeAfterReply basicReq.BfeStatusCode = bfe_http.StatusInternalServerError - return + goto send_response case bfe_module.BfeHandlerRedirect: // make redirect Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code, basicReq.Redirect.Header) From 54d63f015d21a9da9717105e89aaa5d8f6f42b5b Mon Sep 17 00:00:00 2001 From: cc14514 Date: Wed, 11 Feb 2026 22:26:20 +0800 Subject: [PATCH 2/2] feat: add debug flag handling in entrypoint script for BFE Signed-off-by: cc14514 --- Dockerfile | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 7d7e1c042..81c19a18f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -85,6 +85,8 @@ RUN set -ex; \ #!/bin/sh set -eu +CONF_AGENT_PID="" + # Log function log() { echo "[$(date +'%Y-%m-%d %H:%M:%S')] $*" @@ -108,7 +110,7 @@ start_conf_agent() { start_bfe() { log "Starting bfe..." cd /home/work/bfe/bin - exec ./bfe -c ../conf/ -l ../log/ -s + exec ./bfe -c ../conf/ -l ../log/ -s__BFE_DEBUG_FLAG__ } # Signal handler @@ -143,6 +145,13 @@ sleep 2 start_bfe EOF +RUN set -ex; \ + if [ "${VARIANT}" = "debug" ]; then \ + sed -i 's/__BFE_DEBUG_FLAG__/ -d debug/g' /home/work/entrypoint.sh; \ + else \ + sed -i 's/__BFE_DEBUG_FLAG__//g' /home/work/entrypoint.sh; \ + fi + RUN chmod +x /home/work/entrypoint.sh EXPOSE 8080 8443 8421