8
8
"context"
9
9
"fmt"
10
10
"reflect"
11
+ "regexp"
11
12
"slices"
12
13
"sort"
13
14
"strconv"
@@ -103,6 +104,142 @@ func NewClusterWatcher(client *kubernetes.Clientset, log *zap.SugaredLogger, ext
103
104
}
104
105
}
105
106
107
+ func extractVolumeFromMessage (message string ) (string , bool ) {
108
+ // find volume name between quotes after "volume"
109
+ // Message: MountVolume.SetUp failed for volume \"notexists-path\"
110
+ re := regexp .MustCompile (`volume\s*\"([^\"]+)\"` )
111
+ matches := re .FindStringSubmatch (message )
112
+
113
+ if len (matches ) > 1 {
114
+ return matches [1 ], true
115
+ }
116
+ return "" , false
117
+ }
118
+
119
+ func extractPathFromMessage (message string ) (string , bool ) {
120
+ // find mount path between quotes after "mkdir"
121
+ // Message: failed to mkdir \"/etc/apparmor.d/\": mkdir /etc/apparmor.d/: read-only file system
122
+ re := regexp .MustCompile (`mkdir\s+\"([^\"]+)\"` )
123
+ matches := re .FindStringSubmatch (message )
124
+
125
+ if len (matches ) > 1 {
126
+ return matches [1 ], true
127
+ }
128
+ return "" , false
129
+ }
130
+
131
+ func (clusterWatcher * ClusterWatcher ) checkJobStatus (job , runtime , nodename string ) {
132
+ defer func () {
133
+ clusterWatcher .Log .Infof ("checkJobStatus completed for job: %s" , job )
134
+ }()
135
+
136
+ for {
137
+ select {
138
+ case <- time .After (5 * time .Minute ):
139
+ clusterWatcher .Log .Infof ("watcher exit after timeout for job: %s" , job )
140
+ return
141
+ default :
142
+ clusterWatcher .Log .Infof ("watching status for job: %s" , job )
143
+
144
+ j , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Get (context .TODO (), job , v1.GetOptions {})
145
+ if err != nil {
146
+ clusterWatcher .Log .Warnf ("cannot get job: %s" , job )
147
+ return
148
+ }
149
+
150
+ if j .Status .Succeeded > 0 {
151
+ return
152
+ }
153
+
154
+ podsList , err := clusterWatcher .Client .CoreV1 ().Pods (common .Namespace ).List (context .TODO (), v1.ListOptions {
155
+ LabelSelector : fmt .Sprintf ("job-name=%s" , job ),
156
+ })
157
+
158
+ if err != nil {
159
+ clusterWatcher .Log .Warnf ("Cannot get job pod: %s" , job )
160
+ return
161
+ }
162
+
163
+ for _ , pod := range podsList .Items {
164
+ mountFailure := false
165
+ failedMount := ""
166
+ events , err := clusterWatcher .Client .CoreV1 ().Events (common .Namespace ).List (context .TODO (), v1.ListOptions {
167
+ FieldSelector : fmt .Sprintf ("involvedObject.name=%s" , pod .Name ),
168
+ })
169
+ if err != nil {
170
+ clusterWatcher .Log .Warnf ("cannot get pod events for pod: %s" , pod .Name )
171
+ return
172
+ }
173
+
174
+ for _ , event := range events .Items {
175
+ if event .Type == "Warning" && (event .Reason == "FailedMount" ||
176
+ event .Reason == "FailedAttachVolume" ||
177
+ event .Reason == "VolumeMountsFailed" ) {
178
+ clusterWatcher .Log .Infof ("Got Failed Event for job pod: %v" , event .Message )
179
+ mountFailure = true
180
+ failedMount , _ = extractVolumeFromMessage (event .Message )
181
+ clusterWatcher .Log .Infof ("FailedMount: %s" , failedMount )
182
+ break
183
+ }
184
+
185
+ if event .Type == "Warning" && event .Reason == "Failed" && strings .Contains (event .Message , "mkdir" ) {
186
+ clusterWatcher .Log .Infof ("Got Failed Event for job pod: %v" , event .Message )
187
+ if path , readOnly := extractPathFromMessage (event .Message ); readOnly {
188
+ failedMount = path
189
+ mountFailure = true
190
+ clusterWatcher .Log .Infof ("ReadOnly FS: %s" , failedMount )
191
+ break
192
+ }
193
+ }
194
+ }
195
+
196
+ if mountFailure {
197
+ propogatePodDeletion := v1 .DeletePropagationBackground
198
+ err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Delete (context .TODO (), job , v1.DeleteOptions {
199
+ PropagationPolicy : & propogatePodDeletion ,
200
+ })
201
+ if err != nil {
202
+ clusterWatcher .Log .Warnf ("Cannot delete job: %s, err=%s" , job , err )
203
+ return
204
+ }
205
+
206
+ newJob := deploySnitch (nodename , runtime )
207
+
208
+ volumeToDelete := ""
209
+ for _ , vol := range newJob .Spec .Template .Spec .Volumes {
210
+ if vol .HostPath .Path == failedMount || vol .Name == failedMount {
211
+ volumeToDelete = vol .Name
212
+ break
213
+ }
214
+ }
215
+
216
+ newJob .Spec .Template .Spec .Volumes = slices .DeleteFunc (newJob .Spec .Template .Spec .Volumes , func (vol corev1.Volume ) bool {
217
+ if vol .Name == volumeToDelete {
218
+ return true
219
+ }
220
+ return false
221
+ })
222
+
223
+ newJob .Spec .Template .Spec .Containers [0 ].VolumeMounts = slices .DeleteFunc (newJob .Spec .Template .Spec .Containers [0 ].VolumeMounts , func (volMount corev1.VolumeMount ) bool {
224
+ if volMount .Name == volumeToDelete {
225
+ return true
226
+ }
227
+ return false
228
+ })
229
+
230
+ newJ , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Create (context .TODO (), newJob , v1.CreateOptions {})
231
+ if err != nil {
232
+ clusterWatcher .Log .Warnf ("Cannot create job: %s, error=%s" , newJob .Name , err )
233
+ return
234
+ }
235
+ job = newJ .Name
236
+ break
237
+ }
238
+ }
239
+ }
240
+ }
241
+ }
242
+
106
243
func (clusterWatcher * ClusterWatcher ) WatchNodes () {
107
244
log := clusterWatcher .Log
108
245
nodeInformer := informer .Core ().V1 ().Nodes ().Informer ()
@@ -113,12 +250,13 @@ func (clusterWatcher *ClusterWatcher) WatchNodes() {
113
250
runtime = strings .Split (runtime , ":" )[0 ]
114
251
if val , ok := node .Labels [common .OsLabel ]; ok && val == "linux" {
115
252
log .Infof ("Installing snitch on node %s" , node .Name )
116
- _ , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Create (context .Background (), deploySnitch (node .Name , runtime ), v1.CreateOptions {})
253
+ snitchJob , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Create (context .Background (), deploySnitch (node .Name , runtime ), v1.CreateOptions {})
117
254
if err != nil {
118
255
log .Errorf ("Cannot run snitch on node %s, error=%s" , node .Name , err .Error ())
119
256
return
120
257
}
121
258
log .Infof ("Snitch was installed on node %s" , node .Name )
259
+ go clusterWatcher .checkJobStatus (snitchJob .Name , runtime , node .Name )
122
260
}
123
261
}
124
262
},
@@ -136,12 +274,13 @@ func (clusterWatcher *ClusterWatcher) WatchNodes() {
136
274
clusterWatcher .Log .Infof ("Node might have been restarted, redeploying snitch " )
137
275
if val , ok := node .Labels [common .OsLabel ]; ok && val == "linux" {
138
276
log .Infof ("Installing snitch on node %s" , node .Name )
139
- _ , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Create (context .Background (), deploySnitch (node .Name , runtime ), v1.CreateOptions {})
277
+ snitchJob , err := clusterWatcher .Client .BatchV1 ().Jobs (common .Namespace ).Create (context .Background (), deploySnitch (node .Name , runtime ), v1.CreateOptions {})
140
278
if err != nil {
141
279
log .Errorf ("Cannot run snitch on node %s, error=%s" , node .Name , err .Error ())
142
280
return
143
281
}
144
282
log .Infof ("Snitch was installed on node %s" , node .Name )
283
+ go clusterWatcher .checkJobStatus (snitchJob .Name , runtime , node .Name )
145
284
}
146
285
}
147
286
}
@@ -788,14 +927,14 @@ func (clusterWatcher *ClusterWatcher) UpdateCrdStatus(cfg, phase, message string
788
927
// retry the update
789
928
return false , nil
790
929
}
930
+ clusterWatcher .Log .Info ("Config CR Status Updated Successfully" )
791
931
}
792
932
return true , nil
793
933
})
794
934
if err != nil {
795
935
clusterWatcher .Log .Errorf ("Error updating the ConfigCR status %s" , err )
796
936
return
797
937
}
798
- clusterWatcher .Log .Info ("Config CR Status Updated Successfully" )
799
938
}
800
939
801
940
func (clusterWatcher * ClusterWatcher ) UpdateKubeArmorConfigMap (cfg * opv1.KubeArmorConfig ) {
@@ -1002,19 +1141,19 @@ func (clusterWatcher *ClusterWatcher) WatchRecommendedPolicies() error {
1002
1141
var yamlBytes []byte
1003
1142
policies , err := recommend .CRDFs .ReadDir ("." )
1004
1143
if err != nil {
1005
- clusterWatcher .Log .Warnf ("error reading policies FS" , err )
1144
+ clusterWatcher .Log .Warnf ("error reading policies FS %s " , err )
1006
1145
return err
1007
1146
}
1008
1147
for _ , policy := range policies {
1009
1148
csp := & secv1.KubeArmorClusterPolicy {}
1010
1149
if ! policy .IsDir () {
1011
1150
yamlBytes , err = recommend .CRDFs .ReadFile (policy .Name ())
1012
1151
if err != nil {
1013
- clusterWatcher .Log .Warnf ("error reading csp" , policy .Name ())
1152
+ clusterWatcher .Log .Warnf ("error reading csp %s " , policy .Name ())
1014
1153
continue
1015
1154
}
1016
1155
if err := runtime .DecodeInto (scheme .Codecs .UniversalDeserializer (), yamlBytes , csp ); err != nil {
1017
- clusterWatcher .Log .Warnf ("error decoding csp" , policy .Name ())
1156
+ clusterWatcher .Log .Warnf ("error decoding csp %s " , policy .Name ())
1018
1157
continue
1019
1158
}
1020
1159
}
@@ -1024,31 +1163,31 @@ func (clusterWatcher *ClusterWatcher) WatchRecommendedPolicies() error {
1024
1163
clusterWatcher .Log .Infof ("excluding csp " , csp .Name )
1025
1164
err = clusterWatcher .Secv1Client .SecurityV1 ().KubeArmorClusterPolicies ().Delete (context .Background (), csp .GetName (), metav1.DeleteOptions {})
1026
1165
if err != nil && ! metav1errors .IsNotFound (err ) {
1027
- clusterWatcher .Log .Warnf ("error deleting csp" , csp .GetName ())
1166
+ clusterWatcher .Log .Warnf ("error deleting csp %s " , csp .GetName ())
1028
1167
} else if err == nil {
1029
- clusterWatcher .Log .Infof ("deleted csp" , csp .GetName ())
1168
+ clusterWatcher .Log .Infof ("deleted csp :%s " , csp .GetName ())
1030
1169
}
1031
1170
continue
1032
1171
}
1033
1172
csp .Spec .Selector .MatchExpressions = common .RecommendedPolicies .MatchExpressions
1034
1173
_ , err = clusterWatcher .Secv1Client .SecurityV1 ().KubeArmorClusterPolicies ().Create (context .Background (), csp , metav1.CreateOptions {})
1035
1174
if err != nil && ! metav1errors .IsAlreadyExists (err ) {
1036
- clusterWatcher .Log .Warnf ("error creating csp" , csp .GetName ())
1175
+ clusterWatcher .Log .Warnf ("error creating csp %s " , csp .GetName ())
1037
1176
continue
1038
1177
} else if metav1errors .IsAlreadyExists (err ) {
1039
1178
pol , err := clusterWatcher .Secv1Client .SecurityV1 ().KubeArmorClusterPolicies ().Get (context .Background (), csp .GetName (), metav1.GetOptions {})
1040
1179
if err != nil {
1041
- clusterWatcher .Log .Warnf ("error getting csp" , csp .GetName ())
1180
+ clusterWatcher .Log .Warnf ("error getting csp %s " , csp .GetName ())
1042
1181
continue
1043
1182
}
1044
1183
if ! reflect .DeepEqual (pol .Spec .Selector .MatchExpressions , common .RecommendedPolicies .MatchExpressions ) {
1045
1184
pol .Spec .Selector .MatchExpressions = common .RecommendedPolicies .MatchExpressions
1046
1185
_ , err := clusterWatcher .Secv1Client .SecurityV1 ().KubeArmorClusterPolicies ().Update (context .Background (), pol , metav1.UpdateOptions {})
1047
1186
if err != nil {
1048
- clusterWatcher .Log .Warnf ("error updating csp" , csp .GetName ())
1187
+ clusterWatcher .Log .Warnf ("error updating csp %s " , csp .GetName ())
1049
1188
continue
1050
1189
} else {
1051
- clusterWatcher .Log .Info ("updated csp" , csp .GetName ())
1190
+ clusterWatcher .Log .Infof ("updated csp %s " , csp .GetName ())
1052
1191
}
1053
1192
}
1054
1193
} else {
@@ -1058,10 +1197,10 @@ func (clusterWatcher *ClusterWatcher) WatchRecommendedPolicies() error {
1058
1197
if ! policy .IsDir () {
1059
1198
err = clusterWatcher .Secv1Client .SecurityV1 ().KubeArmorClusterPolicies ().Delete (context .Background (), csp .GetName (), metav1.DeleteOptions {})
1060
1199
if err != nil && ! metav1errors .IsNotFound (err ) {
1061
- clusterWatcher .Log .Warnf ("error deleting csp" , csp .GetName ())
1200
+ clusterWatcher .Log .Warnf ("error deleting csp %s " , csp .GetName ())
1062
1201
continue
1063
- } else {
1064
- clusterWatcher .Log .Info ("deleted csp" , csp .GetName ())
1202
+ } else if err == nil {
1203
+ clusterWatcher .Log .Info ("deleted csp %s " , csp .GetName ())
1065
1204
}
1066
1205
}
1067
1206
}
0 commit comments