Skip to content

Commit

Permalink
Merge pull request kubernetes#42097 from kargakis/address-mismatched-…
Browse files Browse the repository at this point in the history
…available-replicas

Automatic merge from submit-queue

Enqueue controllers after minreadyseconds when all pods are ready

@janetkuo this should address kubernetes#41697 (comment). Impossible to unit test this but it should stabilize some of our deployment e2e tests that occasionally fail because of availableReplicas not being updated.

It should also fix kubernetes#41641

Eventually I would like AddAfter to be able to cancel previous invocations of the same key so I opened kubernetes/client-go#131

@kubernetes/sig-apps-bugs
  • Loading branch information
Kubernetes Submit Queue committed Feb 28, 2017
2 parents 9a1f057 + 9eab226 commit 2681e38
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 20 deletions.
9 changes: 8 additions & 1 deletion pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,17 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

// Always updates status as pods come up or die.
if err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus); err != nil {
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
25 changes: 16 additions & 9 deletions pkg/controller/replicaset/replica_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (updateErr error) {
func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (*extensions.ReplicaSet, error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
Expand All @@ -43,14 +43,14 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rs.Generation == rs.Status.ObservedGeneration &&
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
return nil
return rs, nil
}

// deep copy to avoid mutation now.
// TODO this method need some work. Retry on conflict probably, though I suspect this is stomping status to something it probably shouldn't
copyObj, err := api.Scheme.DeepCopy(rs)
if err != nil {
return err
return nil, err
}
rs = copyObj.(*extensions.ReplicaSet)

Expand All @@ -60,27 +60,34 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
// same status.
newStatus.ObservedGeneration = rs.Generation

var getErr error
var getErr, updateErr error
var updatedRS *extensions.ReplicaSet
for i, rs := 0, rs; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
glog.V(4).Infof(fmt.Sprintf("Updating status for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))

rs.Status = newStatus
_, updateErr = c.UpdateStatus(rs)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
updatedRS, updateErr = c.UpdateStatus(rs)
if updateErr == nil {
return updatedRS, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the ReplicaSet with the latest resource version for the next poll
if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
return nil, getErr
}
}

return nil, updateErr
}

func calculateStatus(rs *extensions.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) extensions.ReplicaSetStatus {
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,16 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)

// Always updates status as pods come up or die.
if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus); err != nil {
updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}

// Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 &&
updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) &&
updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) {
rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
23 changes: 15 additions & 8 deletions pkg/controller/replication/replication_controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (updateErr error) {
func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (*v1.ReplicationController, error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
Expand All @@ -40,35 +40,42 @@ func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface,
rc.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rc.Generation == rc.Status.ObservedGeneration &&
reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) {
return nil
return &rc, nil
}
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
// TODO: This can clobber an update if we allow multiple agents to write to the
// same status.
newStatus.ObservedGeneration = rc.Generation

var getErr error
var getErr, updateErr error
var updatedRC *v1.ReplicationController
for i, rc := 0, &rc; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", rc.Namespace, rc.Name) +
glog.V(4).Infof(fmt.Sprintf("Updating status for rc: %s/%s, ", rc.Namespace, rc.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, *(rc.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rc.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration))

rc.Status = newStatus
_, updateErr = c.UpdateStatus(rc)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
updatedRC, updateErr = c.UpdateStatus(rc)
if updateErr == nil {
return updatedRC, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicationController will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the controller with the latest resource version for the next poll
if rc, getErr = c.Get(rc.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
return nil, getErr
}
}

return nil, updateErr
}

// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
Expand Down

0 comments on commit 2681e38

Please sign in to comment.