Skip to content

Commit

Permalink
feat: support prefix match and regex match (#151)
Browse files Browse the repository at this point in the history
* feat: support prefix match and regex match
  • Loading branch information
dk-lockdown committed Jun 13, 2022
1 parent 3642536 commit cdb757a
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 19 deletions.
110 changes: 92 additions & 18 deletions pkg/filter/dt/filter_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package dt

import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"strings"

"github.com/pkg/errors"
Expand All @@ -30,6 +33,38 @@ import (

const httpFilter = "HttpDistributedTransaction"

type MatchType byte

const (
Exact MatchType = iota
Prefix
Regex
)

func (t *MatchType) UnmarshalText(text []byte) error {
if t == nil {
return errors.New("can't unmarshal a nil *MatchType")
}
if !t.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unsupported match type: %q", text)
}
return nil
}

func (t *MatchType) unmarshalText(text []byte) bool {
switch string(text) {
case "exact":
*t = Exact
case "prefix":
*t = Prefix
case "regex":
*t = Regex
default:
return false
}
return true
}

func init() {
filter.RegistryFilterFactory(httpFilter, &httpFactory{})
}
Expand All @@ -53,31 +88,40 @@ func (factory *httpFactory) NewFilter(config map[string]interface{}) (proto.Filt
}

f := &_httpFilter{
conf: filterConfig,
transactionInfos: make(map[string]*TransactionInfo),
tccResources: make(map[string]*TCCResource),
conf: filterConfig,
transactionInfoMap: make(map[string]*TransactionInfo),
transactionInfos: make([]*TransactionInfo, 0),
tccResourceInfoMap: make(map[string]*TccResourceInfo),
}

for _, ti := range filterConfig.TransactionInfos {
f.transactionInfos[strings.ToLower(ti.RequestPath)] = ti
switch ti.MatchType {
case Exact:
f.transactionInfoMap[strings.ToLower(ti.RequestPath)] = ti
case Prefix, Regex:
f.transactionInfos = append(f.transactionInfos, ti)
default:
log.Warnf("unsupported match type, %s, request path: %s", ti.MatchType, ti.RequestPath)
}
log.Debugf("proxy %s, will create global transaction, put xid into request header", ti.RequestPath)
}

for _, r := range filterConfig.TCCResources {
f.tccResources[strings.ToLower(r.PrepareRequestPath)] = r
for _, r := range filterConfig.TCCResourceInfos {
f.tccResourceInfoMap[strings.ToLower(r.PrepareRequestPath)] = r
log.Debugf("proxy %s, will register branch transaction", r.PrepareRequestPath)
}
return f, nil
}

// TransactionInfo transaction info config
type TransactionInfo struct {
RequestPath string `yaml:"request_path" json:"request_path"`
Timeout int32 `yaml:"timeout" json:"timeout"`
RequestPath string `yaml:"request_path" json:"request_path"`
Timeout int32 `yaml:"timeout" json:"timeout"`
MatchType MatchType `yaml:"match_type" json:"match_type"`
}

// TCCResource tcc resource config
type TCCResource struct {
// TccResourceInfo tcc resource config
type TccResourceInfo struct {
PrepareRequestPath string `yaml:"prepare_request_path" json:"prepare_request_path"`
CommitRequestPath string `yaml:"commit_request_path" json:"commit_request_path"`
RollbackRequestPath string `yaml:"rollback_request_path" json:"rollback_request_path"`
Expand All @@ -89,13 +133,15 @@ type HttpFilterConfig struct {
BackendHost string `yaml:"backend_host" json:"backend_host"`

TransactionInfos []*TransactionInfo `yaml:"transaction_infos" json:"transaction_infos"`
TCCResources []*TCCResource `yaml:"tcc_resources" json:"tcc_resources"`
TCCResourceInfos []*TccResourceInfo `yaml:"tcc_resource_infos" json:"tcc_resource_infos"`
}

type _httpFilter struct {
conf *HttpFilterConfig
transactionInfos map[string]*TransactionInfo
tccResources map[string]*TCCResource
conf *HttpFilterConfig

transactionInfoMap map[string]*TransactionInfo
transactionInfos []*TransactionInfo
tccResourceInfoMap map[string]*TccResourceInfo
}

func (f *_httpFilter) GetKind() string {
Expand All @@ -110,7 +156,7 @@ func (f _httpFilter) PreHandle(ctx *fasthttp.RequestCtx) error {
return nil
}

transactionInfo, found := f.transactionInfos[strings.ToLower(string(path))]
transactionInfo, found := f.matchTransactionInfo(string(path))
if found {
result, err := f.handleHttp1GlobalBegin(ctx, transactionInfo)
if !result {
Expand All @@ -121,7 +167,7 @@ func (f _httpFilter) PreHandle(ctx *fasthttp.RequestCtx) error {
return err
}

tccResource, exists := f.tccResources[strings.ToLower(string(path))]
tccResource, exists := f.tccResourceInfoMap[strings.ToLower(string(path))]
if exists {
result, err := f.handleHttp1BranchRegister(ctx, tccResource)
if !result {
Expand All @@ -142,18 +188,46 @@ func (f _httpFilter) PostHandle(ctx *fasthttp.RequestCtx) error {
return nil
}

_, found := f.transactionInfos[strings.ToLower(string(path))]
_, found := f.transactionInfoMap[strings.ToLower(string(path))]
if found {
if err := f.handleHttp1GlobalEnd(ctx); err != nil {
return err
}
}

_, exists := f.tccResources[strings.ToLower(string(path))]
_, exists := f.tccResourceInfoMap[strings.ToLower(string(path))]
if exists {
if err := f.handleHttp1BranchEnd(ctx); err != nil {
return err
}
}
return nil
}

func (f _httpFilter) matchTransactionInfo(requestPath string) (*TransactionInfo, bool) {
path := strings.ToLower(requestPath)
transactionInfo, found := f.transactionInfoMap[path]
if found {
return transactionInfo, found
}
for _, ti := range f.transactionInfos {
switch ti.MatchType {
case Prefix:
if strings.HasPrefix(path, strings.ToLower(ti.RequestPath)) {
return ti, true
}
case Regex:
matched, err := regexp.Match(strings.ToLower(ti.RequestPath), []byte(path))
if err != nil {
log.Warnf("regular expression match error, regex string: %s, error: %s", ti.RequestPath, err)
continue
}
if matched {
return ti, true
}
default:
continue
}
}
return nil, false
}
101 changes: 101 additions & 0 deletions pkg/filter/dt/filter_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2022 CECTC, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dt

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMathTransactionInfo(t *testing.T) {
testCases := []*struct {
requestPath string
transactionInfo *TransactionInfo
matched bool
}{
{
"/v1/order/create",
&TransactionInfo{
RequestPath: "/v1/order/create",
Timeout: 60000,
MatchType: Exact,
},
true,
},
{
"/v1/order/create",
&TransactionInfo{
RequestPath: "/v1/order/",
Timeout: 60000,
MatchType: Prefix,
},
true,
},
{
"/v1/order/create",
&TransactionInfo{
RequestPath: "/v1/order(/.*)?",
Timeout: 60000,
MatchType: Regex,
},
true,
},
{
"/v1/order/delete",
&TransactionInfo{
RequestPath: "/v1/order/create",
Timeout: 60000,
MatchType: Exact,
},
false,
},
{
"/v1/so/create",
&TransactionInfo{
RequestPath: "/v1/order/",
Timeout: 60000,
MatchType: Prefix,
},
false,
},
{
"/v1/so/create",
&TransactionInfo{
RequestPath: "/v1/order(/.*)?",
Timeout: 60000,
MatchType: Regex,
},
false,
},
}
for _, c := range testCases {
t.Run(c.requestPath, func(t *testing.T) {
var filter *_httpFilter
switch c.transactionInfo.MatchType {
case Exact:
filter = &_httpFilter{transactionInfoMap: map[string]*TransactionInfo{
c.transactionInfo.RequestPath: c.transactionInfo,
}}
case Prefix, Regex:
filter = &_httpFilter{transactionInfos: []*TransactionInfo{c.transactionInfo}}
}
_, found := filter.matchTransactionInfo(c.requestPath)
assert.Equal(t, c.matched, found)
})
}
}
2 changes: 1 addition & 1 deletion pkg/filter/dt/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (f *_httpFilter) handleHttp1GlobalEnd(ctx *fasthttp.RequestCtx) error {
}

// handleHttp1BranchRegister return bool, represent whether continue
func (f *_httpFilter) handleHttp1BranchRegister(ctx *fasthttp.RequestCtx, tccResource *TCCResource) (bool, error) {
func (f *_httpFilter) handleHttp1BranchRegister(ctx *fasthttp.RequestCtx, tccResource *TccResourceInfo) (bool, error) {
xid := ctx.Request.Header.Peek(XID)
if string(xid) == "" {
ctx.Response.Reset()
Expand Down

0 comments on commit cdb757a

Please sign in to comment.