Skip to content

Commit e842cf7

Browse files
committed
Add notify() in all the reconcilers
notify() is used to emit events for new artifact and failure recovery scenarios. It's implemented in all the reconcilers. Previously, when there used to be a failure due to any reason, on a subsequent successful reconciliation, no notification was sent to indicate that the failure has been resolved. With notify(), the old version of the object is compared with the new version of the object to determine if all, if any, of the failures have been resolved and a notification is sent. The notification message is the same that's sent in usual successful source reconciliation message about stored artifact. Signed-off-by: Sunny <darkowlzz@protonmail.com>
1 parent 362bc56 commit e842cf7

8 files changed

+656
-35
lines changed

controllers/bucket_controller.go

+54-5
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
9999
},
100100
}
101101

102+
// bucketFailConditions contains the conditions that represent failure.
103+
var bucketFailConditions = []string{
104+
sourcev1.FetchFailedCondition,
105+
sourcev1.StorageOperationFailedCondition,
106+
}
107+
102108
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
103109
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
104110
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@@ -307,14 +313,16 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
307313
return
308314
}
309315

310-
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
316+
// reconcile iterates through the bucketReconcileFunc tasks for the
311317
// object. It returns early on the first call that returns
312318
// reconcile.ResultRequeue, or produces an error.
313319
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
314320
if obj.Generation != obj.Status.ObservedGeneration {
315321
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
316322
}
317323

324+
oldObj := obj.DeepCopy()
325+
318326
// Create temp working dir
319327
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
320328
if err != nil {
@@ -355,9 +363,54 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
355363
// Prioritize requeue request in the result.
356364
res = sreconcile.LowestRequeuingResult(res, recResult)
357365
}
366+
367+
r.notify(oldObj, obj, index, res, resErr)
368+
358369
return res, resErr
359370
}
360371

372+
// notify emits notification related to the reconciliation.
373+
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
374+
// Notify successful reconciliation for new artifact and recovery from any
375+
// failure.
376+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
377+
annotations := map[string]string{
378+
"revision": newObj.Status.Artifact.Revision,
379+
"checksum": newObj.Status.Artifact.Checksum,
380+
}
381+
382+
var oldChecksum string
383+
if oldObj.GetArtifact() != nil {
384+
oldChecksum = oldObj.GetArtifact().Checksum
385+
}
386+
387+
message := fmt.Sprintf("stored %d files from '%s'", index.Len(), newObj.Spec.BucketName)
388+
389+
if oldChecksum != newObj.GetArtifact().Checksum {
390+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
391+
"NewArtifact", message)
392+
} else {
393+
// If there were failure conditions in the old object but none in
394+
// the new object, emit an event to signal that the failures have
395+
// been resolved.
396+
failuresBefore := 0
397+
failuresNow := 0
398+
for _, failCondition := range bucketFailConditions {
399+
if conditions.Get(oldObj, failCondition) != nil {
400+
failuresBefore++
401+
}
402+
if conditions.Get(newObj, failCondition) != nil {
403+
failuresNow++
404+
}
405+
}
406+
if failuresBefore > 0 && failuresNow == 0 {
407+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
408+
meta.SucceededReason, message)
409+
}
410+
}
411+
}
412+
}
413+
361414
// reconcileStorage ensures the current state of the storage matches the
362415
// desired and previously observed state.
363416
//
@@ -574,10 +627,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
574627
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
575628
return sreconcile.ResultEmpty, e
576629
}
577-
r.annotatedEventLogf(ctx, obj, map[string]string{
578-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
579-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
580-
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
581630

582631
// Record it on the object
583632
obj.Status.Artifact = artifact.DeepCopy()

controllers/bucket_controller_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net/http"
2324
"net/url"
@@ -1163,3 +1164,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
11631164
})
11641165
}
11651166
}
1167+
1168+
func TestBucketReconciler_notify(t *testing.T) {
1169+
tests := []struct {
1170+
name string
1171+
res sreconcile.Result
1172+
resErr error
1173+
oldObjBeforeFunc func(obj *sourcev1.Bucket)
1174+
newObjBeforeFunc func(obj *sourcev1.Bucket)
1175+
wantEvent string
1176+
}{
1177+
{
1178+
name: "error - no event",
1179+
res: sreconcile.ResultEmpty,
1180+
resErr: errors.New("some error"),
1181+
},
1182+
{
1183+
name: "new artifact",
1184+
res: sreconcile.ResultSuccess,
1185+
resErr: nil,
1186+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1187+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1188+
},
1189+
wantEvent: "Normal NewArtifact stored 2 files from",
1190+
},
1191+
{
1192+
name: "recovery from failure",
1193+
res: sreconcile.ResultSuccess,
1194+
resErr: nil,
1195+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1196+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1197+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1198+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1199+
},
1200+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1201+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1202+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1203+
},
1204+
wantEvent: "Normal Succeeded stored 2 files from",
1205+
},
1206+
{
1207+
name: "recovery and new artifact",
1208+
res: sreconcile.ResultSuccess,
1209+
resErr: nil,
1210+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1211+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1212+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1213+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1214+
},
1215+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1216+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
1217+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1218+
},
1219+
wantEvent: "Normal NewArtifact stored 2 files from",
1220+
},
1221+
{
1222+
name: "no updates",
1223+
res: sreconcile.ResultSuccess,
1224+
resErr: nil,
1225+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1226+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1227+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1228+
},
1229+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1230+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1231+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1232+
},
1233+
},
1234+
}
1235+
1236+
for _, tt := range tests {
1237+
t.Run(tt.name, func(t *testing.T) {
1238+
g := NewWithT(t)
1239+
1240+
recorder := record.NewFakeRecorder(32)
1241+
1242+
oldObj := &sourcev1.Bucket{
1243+
Spec: sourcev1.BucketSpec{
1244+
BucketName: "test-bucket",
1245+
},
1246+
}
1247+
newObj := oldObj.DeepCopy()
1248+
1249+
if tt.oldObjBeforeFunc != nil {
1250+
tt.oldObjBeforeFunc(oldObj)
1251+
}
1252+
if tt.newObjBeforeFunc != nil {
1253+
tt.newObjBeforeFunc(newObj)
1254+
}
1255+
1256+
reconciler := &BucketReconciler{
1257+
EventRecorder: recorder,
1258+
}
1259+
index := &etagIndex{
1260+
index: map[string]string{
1261+
"zzz": "qqq",
1262+
"bbb": "ddd",
1263+
},
1264+
}
1265+
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)
1266+
1267+
select {
1268+
case x, ok := <-recorder.Events:
1269+
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
1270+
if tt.wantEvent != "" {
1271+
g.Expect(x).To(ContainSubstring(tt.wantEvent))
1272+
}
1273+
default:
1274+
if tt.wantEvent != "" {
1275+
t.Errorf("expected some event to be emitted")
1276+
}
1277+
}
1278+
})
1279+
}
1280+
}

controllers/gitrepository_controller.go

+54-4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
9191
},
9292
}
9393

94+
// gitRepositoryFailConditions contains the conditions that represent failure.
95+
var gitRepositoryFailConditions = []string{
96+
sourcev1.FetchFailedCondition,
97+
sourcev1.IncludeUnavailableCondition,
98+
sourcev1.StorageOperationFailedCondition,
99+
}
100+
94101
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
95102
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
96103
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
@@ -217,6 +224,8 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
217224
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
218225
}
219226

227+
oldObj := obj.DeepCopy()
228+
220229
// Create temp dir for Git clone
221230
tmpDir, err := util.TempDirForObj("", obj)
222231
if err != nil {
@@ -258,9 +267,54 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
258267
// Prioritize requeue request in the result.
259268
res = sreconcile.LowestRequeuingResult(res, recResult)
260269
}
270+
271+
r.notify(oldObj, obj, commit, res, resErr)
272+
261273
return res, resErr
262274
}
263275

276+
// notify emits notification related to the reconciliation.
277+
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
278+
// Notify successful reconciliation for new artifact and recovery from any
279+
// failure.
280+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
281+
annotations := map[string]string{
282+
"revision": newObj.Status.Artifact.Revision,
283+
"checksum": newObj.Status.Artifact.Checksum,
284+
}
285+
286+
var oldChecksum string
287+
if oldObj.GetArtifact() != nil {
288+
oldChecksum = oldObj.GetArtifact().Checksum
289+
}
290+
291+
message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
292+
293+
if oldChecksum != newObj.GetArtifact().Checksum {
294+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
295+
"NewArtifact", message)
296+
} else {
297+
// If there were failure conditions in the old object but none in
298+
// the new object, emit an event to signal that the failures have
299+
// been resolved.
300+
failuresBefore := 0
301+
failuresNow := 0
302+
for _, failCondition := range gitRepositoryFailConditions {
303+
if conditions.Get(oldObj, failCondition) != nil {
304+
failuresBefore++
305+
}
306+
if conditions.Get(newObj, failCondition) != nil {
307+
failuresNow++
308+
}
309+
}
310+
if failuresBefore > 0 && failuresNow == 0 {
311+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
312+
meta.SucceededReason, message)
313+
}
314+
}
315+
}
316+
}
317+
264318
// reconcileStorage ensures the current state of the storage matches the
265319
// desired and previously observed state.
266320
//
@@ -523,10 +577,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
523577
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
524578
return sreconcile.ResultEmpty, e
525579
}
526-
r.AnnotatedEventf(obj, map[string]string{
527-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
528-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
529-
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())
530580

531581
// Record it on the object
532582
obj.Status.Artifact = artifact.DeepCopy()

0 commit comments

Comments
 (0)