Skip to content

Commit

Permalink
[SS] When routing workflows, support adding git default branch to rou…
Browse files Browse the repository at this point in the history
…ting key as a fallback (buildbuddy-io#6144)
  • Loading branch information
maggie-lou committed Mar 14, 2024
1 parent 59a5b10 commit 0dfd136
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 37 deletions.
103 changes: 66 additions & 37 deletions enterprise/server/scheduling/task_router/task_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

var (
affinityRoutingEnabled = flag.Bool("executor.affinity_routing_enabled", true, "Enables affinity routing, which attempts to route actions to the executor that most recently ran that action.")
affinityRoutingEnabled = flag.Bool("executor.affinity_routing_enabled", true, "Enables affinity routing, which attempts to route actions to the executor that most recently ran that action.")
defaultBranchRoutingEnabled = flag.Bool("executor.workflow_default_branch_routing_enabled", false, "Enables default branch routing for workflows. When routing a workflow action, if there are no executors that ran that action for the same git branch, try to route it to an executor that ran the action for the same default branch.")
)

const (
Expand Down Expand Up @@ -114,7 +115,7 @@ func (tr *taskRouter) RankNodes(ctx context.Context, cmd *repb.Command, remoteIn
return nonePreferred(nodes)
}

preferredNodeLimit, routingKey, err := strategy.RoutingInfo(params)
preferredNodeLimit, routingKeys, err := strategy.RoutingInfo(params)
if err != nil {
log.Errorf("Failed to compute routing info: %s", err)
return nonePreferred(nodes)
Expand All @@ -123,31 +124,40 @@ func (tr *taskRouter) RankNodes(ctx context.Context, cmd *repb.Command, remoteIn
return nonePreferred(nodes)
}

preferredNodeIDs, err := tr.rdb.LRange(ctx, routingKey, 0, -1).Result()
if err != nil {
log.Errorf("Failed to rank nodes: redis LRANGE failed: %s", err)
return nonePreferred(nodes)
}

nodeByID := map[string]interfaces.ExecutionNode{}
for _, node := range nodes {
nodeByID[node.GetExecutorID()] = node
}
preferredSet := map[string]struct{}{}
ranked := make([]interfaces.RankedExecutionNode, 0, len(nodes))

log.Debugf("Preferred executor IDs for %q: %v", routingKey, preferredNodeIDs)
// Routing keys should be prioritized in the order they were returned
for _, routingKey := range routingKeys {
preferredNodeIDs, err := tr.rdb.LRange(ctx, routingKey, 0, -1).Result()
if err != nil {
log.Errorf("Failed to rank nodes: redis LRANGE failed: %s", err)
return nonePreferred(nodes)
}

log.Debugf("Preferred executor IDs for %q: %v", routingKey, preferredNodeIDs)

// Place all preferred nodes first (in the order that they appear in the Redis
// list), then the remaining ones in shuffled order.
for _, id := range preferredNodeIDs[:min(preferredNodeLimit, len(preferredNodeIDs))] {
node := nodeByID[id]
if node == nil {
continue
// Place all preferred nodes first.
// For each routing key, preferred nodes should be prioritized in the order
// they appear in the Redis list.
for _, id := range preferredNodeIDs[:min(preferredNodeLimit, len(preferredNodeIDs))] {
node := nodeByID[id]
if node == nil {
continue
}
if _, ok := preferredSet[node.GetExecutorID()]; ok {
continue
}
preferredSet[id] = struct{}{}
ranked = append(ranked, rankedExecutionNode{node: node, preferred: true})
}
preferredSet[id] = struct{}{}
ranked = append(ranked, rankedExecutionNode{node: node, preferred: true})
}

// Randomly shuffle non-preferred nodes at the end of the ranking.
for _, node := range nodes {
if _, ok := preferredSet[node.GetExecutorID()]; ok {
continue
Expand All @@ -167,15 +177,23 @@ func (tr *taskRouter) MarkComplete(ctx context.Context, cmd *repb.Command, remot
if strategy == nil {
return
}
preferredNodeLimit, routingKey, err := strategy.RoutingInfo(params)
preferredNodeLimit, routingKeys, err := strategy.RoutingInfo(params)
if err != nil {
log.Errorf("Failed to compute routing info: %s", err)
return
} else if len(routingKeys) < 1 {
log.Errorf("No routing keys were generated")
return
}

if preferredNodeLimit == 0 {
return
}

// Routing keys are ranked in order of priority. We only update the
// routing table for the highest priority key.
routingKey := routingKeys[0]

pipe := tr.rdb.TxPipeline()
// Push the node to the head of the list (but first remove it if already
// present to avoid dupes), trim to max length to prevent it from growing
Expand Down Expand Up @@ -233,11 +251,13 @@ type Router interface {
// false otherwise. Note: Applies() must be deterministic.
Applies(params routingParams) bool

// Returns the routing info (preferredNodeLimit and routingKey) for the
// Returns the routing info (preferredNodeLimit and routingKeys) for the
// provided routing parameters. The preferredNodeLimit is the number of
// preferred executor nodes that should be used, and the routing key is the
// Redis key where the list of preferred executor nodes are stored.
RoutingInfo(params routingParams) (int, string, error)
// preferred executor nodes that should be used. The routing keys are
// Redis keys where the list of preferred executor nodes are stored. Keys
// are sorted in order of most preferred to least preferred. That order
// should be preserved when ranking nodes.
RoutingInfo(params routingParams) (int, []string, error)
}

// The runnerRecycler is a router that attempts to "recycle" warm execution
Expand All @@ -255,8 +275,9 @@ func (runnerRecycler) preferredNodeLimit(params routingParams) int {
return defaultPreferredNodeLimit
}

func (runnerRecycler) routingKey(params routingParams) (string, error) {
func (runnerRecycler) routingKeys(params routingParams) ([]string, error) {
parts := []string{"task_route", params.groupID}
keys := make([]string, 0)

if params.remoteInstanceName != "" {
parts = append(parts, params.remoteInstanceName)
Expand All @@ -268,31 +289,39 @@ func (runnerRecycler) routingKey(params routingParams) (string, error) {
}
b, err := proto.Marshal(p)
if err != nil {
return "", status.InternalErrorf("failed to marshal Command: %s", err)
return nil, status.InternalErrorf("failed to marshal Command: %s", err)
}
parts = append(parts, hash.Bytes(b))

// For workflow tasks, route using GIT_BRANCH so that when re-running the
// For workflow tasks, route using git branch name so that when re-running the
// workflow multiple times using the same branch, the runs are more likely
// to hit an executor with a warmer snapshot cache.
if platform.IsCICommand(params.cmd) {
branch := ""
for _, envVar := range params.cmd.EnvironmentVariables {
if envVar.GetName() == "GIT_BRANCH" {
branch = envVar.GetValue()
break
envVarNames := []string{"GIT_BRANCH"}
if *defaultBranchRoutingEnabled {
envVarNames = append(envVarNames, "GIT_BASE_BRANCH", "GIT_REPO_DEFAULT_BRANCH")
}
for _, routingEnvVar := range envVarNames {
for _, envVar := range params.cmd.EnvironmentVariables {
if envVar.GetName() == routingEnvVar {
branch := envVar.GetValue()
keys = append(keys, strings.Join(append(parts, hash.String(branch)), "/"))
}
}
}
parts = append(parts, hash.String(branch))
}

return strings.Join(parts, "/"), nil
if len(keys) == 0 {
keys = append(keys, strings.Join(parts, "/"))
}

return keys, nil
}

func (s runnerRecycler) RoutingInfo(params routingParams) (int, string, error) {
func (s runnerRecycler) RoutingInfo(params routingParams) (int, []string, error) {
nodeLimit := s.preferredNodeLimit(params)
key, err := s.routingKey(params)
return nodeLimit, key, err
keys, err := s.routingKeys(params)
return nodeLimit, keys, err
}

func isWorkflow(cmd *repb.Command) bool {
Expand Down Expand Up @@ -351,10 +380,10 @@ func (affinityRouter) routingKey(params routingParams) (string, error) {
return strings.Join(parts, "/"), nil
}

func (s affinityRouter) RoutingInfo(params routingParams) (int, string, error) {
func (s affinityRouter) RoutingInfo(params routingParams) (int, []string, error) {
nodeLimit := s.preferredNodeLimit(params)
key, err := s.routingKey(params)
return nodeLimit, key, err
return nodeLimit, []string{key}, err
}

func getFirstOutput(cmd *repb.Command) string {
Expand Down
49 changes: 49 additions & 0 deletions enterprise/server/scheduling/task_router/task_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,55 @@ func TestTaskRouter_WorkflowGitRefRouting(t *testing.T) {
requireNotAlwaysRanked(0, executorID1, t, router, ctx, prBranchCmd, instanceName)
}

func TestTaskRouter_WorkflowGitRefRouting_DefaultRef(t *testing.T) {
flags.Set(t, "executor.workflow_default_branch_routing_enabled", true)
env := newTestEnv(t)
router := newTaskRouter(t, env)
nodes := sequentiallyNumberedNodes(100)
ctx := withAuthUser(t, context.Background(), env, "US1")
instanceName := ""

// Mark executor1 as having completed a workflow run on the "main" branch.
mainBranchCmd := &repb.Command{
Platform: &repb.Platform{Properties: []*repb.Platform_Property{
{Name: "recycle-runner", Value: "true"},
{Name: "workflow-id", Value: "WF123"},
}},
EnvironmentVariables: []*repb.Command_EnvironmentVariable{
{Name: "GIT_BRANCH", Value: "main"},
},
Arguments: []string{"./buildbuddy_ci_runner"},
}
router.MarkComplete(ctx, mainBranchCmd, instanceName, executorID1)

// Even though this workflow is running on a different branch, executor1
// should be preferred because it ran the workflow on a matching
// default branch.
prBranchCmd := &repb.Command{
Platform: &repb.Platform{Properties: []*repb.Platform_Property{
{Name: "recycle-runner", Value: "true"},
{Name: "workflow-id", Value: "WF123"},
}},
EnvironmentVariables: []*repb.Command_EnvironmentVariable{
{Name: "GIT_BRANCH", Value: "my-cool-pr"},
{Name: "GIT_REPO_DEFAULT_BRANCH", Value: "main"},
},
Arguments: []string{"./buildbuddy_ci_runner"},
}
ranked := router.RankNodes(ctx, prBranchCmd, instanceName, nodes)
require.Equal(t, executorID1, ranked[0].GetExecutionNode().GetExecutorID())
// Mark executor1 as having completed a workflow run on the pr branch.
router.MarkComplete(ctx, prBranchCmd, instanceName, executorID1)

// Simulate executor2 running a workflow on the "main" branch.
router.MarkComplete(ctx, mainBranchCmd, instanceName, executorID2)

// The router should prioritize routing a workflow for the pr branch to the
// executor that last ran the pr branch, not the one that last ran the default branch.
ranked = router.RankNodes(ctx, prBranchCmd, instanceName, nodes)
require.Equal(t, executorID1, ranked[0].GetExecutionNode().GetExecutorID())
}

func requireNonePreferred(t *testing.T, rankedNodes []interfaces.RankedExecutionNode) {
for i := 1; i < len(rankedNodes); i++ {
require.False(t, rankedNodes[i].IsPreferred())
Expand Down

0 comments on commit 0dfd136

Please sign in to comment.