Skip to content

Commit 3955082

Browse files
mayabarvMaroon
authored andcommitted
Separate code of Scorer interface and scorer implementations + add scorerManager
1 parent d726f54 commit 3955082

File tree

2 files changed

+108
-25
lines changed

2 files changed

+108
-25
lines changed

pkg/epp/scheduling/scorer.go

+49-25
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ limitations under the License.
1717
package scheduling
1818

1919
import (
20+
"math/rand/v2"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/log"
2023
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2124
)
2225

@@ -30,41 +33,62 @@ type Scorer interface {
3033
ScoreTargets(ctx *types.Context, pods []*types.PodMetrics, req *types.LLMRequest) ([]PodScore, error)
3134
}
3235

33-
// sessionAffinity is a routing scorer that routes subsequent
34-
// requests in a session to the same pod as the first request in the
35-
// session was sent to, by giving that pod the specified weight and assigning
36-
// zero score to the rest of the targets
37-
type SessionAffinityScorer struct {
38-
weight float64
39-
datastore Datastore
36+
// Scorer is the interface that scorers must implement
37+
type ScorerMng struct {
38+
scorers []Scorer
4039
}
4140

42-
func NewSessionAffinityScorer(weight float64, datastore Datastore) Scorer {
43-
return SessionAffinityScorer{
44-
weight: weight,
45-
datastore: datastore,
41+
func NewScorerMng() *ScorerMng {
42+
return &ScorerMng{
43+
scorers: make([]Scorer, 0),
4644
}
4745
}
4846

49-
// ScoreTargets does the actual scoring of the target pods by the session affinity.
50-
func (s SessionAffinityScorer) ScoreTargets(ctx *types.Context, pods []*types.PodMetrics, req *types.LLMRequest) ([]PodScore, error) {
51-
scoredPods := make([]PodScore, len(pods))
52-
selectedPodFullName := ""
47+
func (sm *ScorerMng) addScorer(scorer Scorer) {
48+
sm.scorers = append(sm.scorers, scorer)
49+
}
50+
51+
func (sm *ScorerMng) scoreTargets(ctx *types.Context, pods []*types.PodMetrics, req *types.LLMRequest) (*types.PodMetrics, error) {
52+
logger := log.FromContext(ctx)
53+
54+
podsTotalScore := make(map[*types.PodMetrics]float64)
55+
56+
// initialize zero score for all pods
57+
for _, pod := range pods {
58+
podsTotalScore[pod] = 0.0
59+
}
60+
61+
// add scores from all scorers
62+
for _, scorer := range sm.scorers {
63+
scoredPods, err := scorer.ScoreTargets(ctx, pods, req)
64+
if err != nil {
65+
logger.Info(">>> In scoreTargets, score targets returned error", "error", err)
66+
return nil, err
67+
}
5368

54-
if req.SessionID != "" {
55-
selectedPod := s.datastore.GetPodForSession(req.SessionID)
56-
if selectedPod != nil {
57-
selectedPodFullName = selectedPod.NamespacedName.String()
69+
for _, scoredPod := range scoredPods {
70+
podsTotalScore[scoredPod.Pod] += scoredPod.Score
5871
}
5972
}
6073

61-
// session is not defined - no score for all pods
62-
for i, pod := range pods {
63-
if selectedPodFullName == pod.NamespacedName.String() {
64-
scoredPods[i].Score = s.weight
74+
// select pod with maximum score, if more than one with the max score - use random pods from the list
75+
var highestScoreTargets []*types.PodMetrics
76+
maxScore := -1.0
77+
78+
for pod, score := range podsTotalScore {
79+
if score > maxScore {
80+
maxScore = score
81+
highestScoreTargets = []*types.PodMetrics{pod}
82+
} else if score == maxScore {
83+
highestScoreTargets = append(highestScoreTargets, pod)
6584
}
66-
scoredPods[i].Pod = pod
6785
}
6886

69-
return scoredPods, nil
87+
// single pod with max score
88+
if len(highestScoreTargets) == 1 {
89+
return highestScoreTargets[0], nil
90+
}
91+
92+
// select random pod from list of pods with max score
93+
return highestScoreTargets[rand.IntN(len(highestScoreTargets))], nil
7094
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package scheduling
17+
18+
import (
19+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
20+
)
21+
22+
// sessionAffinity is a routing scorer that routes subsequent
23+
// requests in a session to the same pod as the first request in the
24+
// session was sent to, by giving that pod the specified weight and assigning
25+
// zero score to the rest of the targets
26+
type SessionAffinityScorer struct {
27+
weight float64
28+
datastore Datastore
29+
}
30+
31+
func NewSessionAffinityScorer(weight float64, datastore Datastore) Scorer {
32+
return SessionAffinityScorer{
33+
weight: weight,
34+
datastore: datastore,
35+
}
36+
}
37+
38+
// ScoreTargets does the actual scoring of the target pods by the session affinity.
39+
func (s SessionAffinityScorer) ScoreTargets(ctx *types.Context, pods []*types.PodMetrics, req *types.LLMRequest) ([]PodScore, error) {
40+
scoredPods := make([]PodScore, len(pods))
41+
selectedPodFullName := ""
42+
43+
if req.SessionID != "" {
44+
selectedPod := s.datastore.GetPodForSession(req.SessionID)
45+
if selectedPod != nil {
46+
selectedPodFullName = selectedPod.NamespacedName.String()
47+
}
48+
}
49+
50+
// session is not defined - no score for all pods
51+
for i, pod := range pods {
52+
if selectedPodFullName == pod.NamespacedName.String() {
53+
scoredPods[i].Score = s.weight
54+
}
55+
scoredPods[i].Pod = pod
56+
}
57+
58+
return scoredPods, nil
59+
}

0 commit comments

Comments
 (0)