Skip to content

Commit d6843b4

Browse files
committed
v1.0.X-runtime
1 parent be08f7a commit d6843b4

22 files changed

+803
-86
lines changed

cmd/observe.go

+67-9
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ var (
3030
observePolicyFile string
3131
observeSchedule string
3232
observeReport string
33+
observeMode string
3334
reportMutex sync.Mutex
3435
reportDir string = "_status"
3536
allFileInfos []FileInfo
37+
observeList []string
3638
)
3739

3840
var observeCmd = &cobra.Command{
@@ -53,6 +55,7 @@ func init() {
5355
observeCmd.Flags().StringVar(&observePolicyFile, "policy", "", "Policy file")
5456
observeCmd.Flags().StringVar(&observeSchedule, "schedule", "", "Global Cron Schedule")
5557
observeCmd.Flags().StringVar(&observeReport, "report", "", "Report Cron Schedule")
58+
observeCmd.Flags().StringVar(&observeMode, "mode", "last", "Observe mode for path monitoring")
5659

5760
}
5861

@@ -144,31 +147,84 @@ func runObserve(cmd *cobra.Command, args []string) {
144147

145148
// SCHEDULERS
146149
schedule := getScheduleForPolicy(policy, config.Flags.PolicySchedule)
147-
if schedule == "" {
148-
log.Warn().Str("policy", policy.ID).Msg("No schedule found for policy, skipping")
150+
if schedule == "" && policy.Observe == "" && policy.Runtime.Observe == "" {
151+
log.Warn().Str("policy", policy.ID).Msg("No schedule available for policy, skipping")
149152
continue
150153
}
151154

152-
if !validateCronExpression(schedule) {
155+
if schedule != "" && !validateCronExpression(schedule) {
153156
log.Error().Str("policy", policy.ID).Str("schedule", schedule).Msg("Invalid cron expression, skipping")
154157
continue
155158
}
156159
if policy.Type != "api" && policy.Type != "runtime" && policy.Type != "rego" {
157160
policy.Metadata.TargetInfo = preparePolicyPaths(policy, allFileInfos)
158161
}
159162

160-
run = true
163+
if schedule != "" {
164+
run = true
161165

162-
policyTask := createPolicyTask(policy, dispatcher)
163-
taskr.Task(schedule, policyTask)
164-
log.Info().Str("policy", policy.ID).Str("schedule", schedule).Msg("Added policy to Scheduler")
166+
policyTask := createPolicyTask(policy, dispatcher)
167+
taskr.Task(schedule, policyTask)
168+
log.Info().Str("policy", policy.ID).Str("schedule", schedule).Msg("Added policy to Scheduler")
169+
}
170+
171+
if (policy.Observe != "" && policy.Schedule != "") || (policy.Runtime.Observe != "" && policy.Schedule != "") {
172+
log.Error().Str("policy", policy.ID).Msg("Policy with both SCHEDULE and OBSERVE defined. Skipping OBSERVE directive")
173+
continue
174+
}
175+
176+
if policy.Type != "runtime" && policy.Observe != "" {
177+
178+
exists, isDirectory, _ := PathInfo(policy.Observe)
179+
180+
if exists && !PolicyExistsInCache(policy.Observe) {
181+
182+
overlaps, overlapWith := detectOverlap(observeList, policy.Observe)
183+
184+
if overlaps {
185+
log.Error().Str("policy", policy.ID).Msgf("Observe path overlaps with another policy at : %s", overlapWith)
186+
continue
187+
}
188+
189+
observeList = append(observeList, policy.Observe)
190+
191+
log.Debug().Str("policy", policy.ID).Bool("exists", exists).Bool("isDirectory", isDirectory).Msgf("Setting up watch : %s", policy.Observe)
192+
193+
StorePolicyInCache(policy.Observe, policy)
194+
195+
log.Debug().Int("Cache count", GetPolicyCacheCount()).Msg("Cache Status")
196+
197+
if PolicyExistsInCache(policy.Observe) {
198+
log.Info().Str("policy", policy.ID).Str("Observe", policy.Observe).Msg("Added policy to Path Watcher")
199+
200+
run = true
201+
202+
//path watcher
203+
go func() {
204+
defer func() {
205+
if r := recover(); r != nil {
206+
log.Error().Interface("recover", r).Msg("Panic in path watcher goroutine")
207+
}
208+
}()
209+
watchPaths(policy.Observe)
210+
}()
211+
212+
} else {
213+
log.Warn().Str("policy", policy.ID).Msg("Failed Caching the policy - investigate")
214+
}
215+
216+
} else {
217+
log.Warn().Str("policy", policy.ID).Str("path", policy.Observe).Msg("Runtime observe has invalid path, skipping")
218+
}
219+
220+
}
165221

166222
//PATH WATCHERS
167223
if policy.Type == "runtime" && policy.Runtime.Observe != "" {
168224

169225
exists, isDirectory, _ := PathInfo(policy.Runtime.Observe)
170226

171-
if exists {
227+
if exists && !PolicyExistsInCache(policy.Runtime.Observe) {
172228

173229
log.Debug().Str("policy", policy.ID).Bool("exists", exists).Bool("isDirectory", isDirectory).Msgf("Setting up watch : %s", policy.Runtime.Observe)
174230

@@ -179,6 +235,8 @@ func runObserve(cmd *cobra.Command, args []string) {
179235
if PolicyExistsInCache(policy.Runtime.Observe) {
180236
log.Info().Str("policy", policy.ID).Str("Observe", policy.Runtime.Observe).Msg("Added policy to Path Watcher")
181237

238+
run = true
239+
182240
//path watcher
183241
go func() {
184242
defer func() {
@@ -194,7 +252,7 @@ func runObserve(cmd *cobra.Command, args []string) {
194252
}
195253

196254
} else {
197-
log.Warn().Str("policy", policy.ID).Str("path", policy.Runtime.Observe).Msg("Runtime observe has invalid path, skipping")
255+
log.Error().Str("policy", policy.ID).Str("path", policy.Runtime.Observe).Msg("Runtime observe has invalid path, skipping")
198256
}
199257

200258
}

cmd/policy.go

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type Policy struct {
6868
Enforcement []Enforcement `yaml:"enforcement"`
6969
Metadata Metadata `yaml:"metadata"`
7070
FilePattern string `yaml:"filepattern"`
71+
Observe string `yaml:"observe"`
7172
Schema Schema `yaml:"_schema"`
7273
Rego Rego `yaml:"_rego"`
7374
Regex []string `yaml:"_regex"`

cmd/root.go

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ func setupLogging() {
123123
log.Info().Msgf("Host Data: %s", hostData)
124124
log.Info().Msgf("Host Fingerprint: %s", hostFingerprint)
125125

126+
// ----------------------------------------------
127+
// ---------------------------------------------- EXPERIMENTAL END
128+
// ----------------------------------------------
129+
126130
}
127131
if silentMode {
128132

cmd/target.go

+22
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,25 @@ func processIgnorePatterns(ignorePatterns []string) []string {
151151
}
152152
return processedPatterns
153153
}
154+
155+
func detectOverlap(paths []string, targetPath string) (bool, string) {
156+
// Normalize the target path
157+
targetPath = filepath.Clean(targetPath)
158+
159+
for _, path := range paths {
160+
// Normalize the current path
161+
path = filepath.Clean(path)
162+
163+
// Check if the target path is the same as or a subdirectory of the current path
164+
if targetPath == path || strings.HasPrefix(targetPath, path+string(filepath.Separator)) {
165+
return true, path
166+
}
167+
168+
// Check if the current path is a subdirectory of the target path
169+
if strings.HasPrefix(path, targetPath+string(filepath.Separator)) {
170+
return true, targetPath
171+
}
172+
}
173+
174+
return false, ""
175+
}

cmd/watch.go

+163-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@ import (
88
"os"
99
"runtime"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/fsnotify/fsnotify"
1415
"github.com/segmentio/ksuid"
1516
)
1617

18+
const eventCacheDuration = 2000 * time.Millisecond // Configurable duration
19+
20+
type cachedEvent struct {
21+
event fsnotify.Event
22+
timer *time.Timer
23+
}
24+
1725
type HostInfo struct {
1826
Hostname string
1927
OS string
@@ -35,7 +43,16 @@ func watchPaths(paths ...string) {
3543
defer w.Close()
3644

3745
// Start listening for events.
38-
go watchLoop(w, paths)
46+
switch observeMode {
47+
case "last":
48+
go watchLoopLastEvent(w, paths)
49+
case "first":
50+
go watchLoopFirstEvent(w, paths)
51+
case "all":
52+
go watchLoopAllEvents(w, paths)
53+
default:
54+
go watchLoopFirstEvent(w, paths)
55+
}
3956

4057
// Add all paths from the commandline.
4158
for _, p := range paths {
@@ -50,7 +67,7 @@ func watchPaths(paths ...string) {
5067
<-make(chan struct{}) // Block forever
5168
}
5269

53-
func watchLoop(w *fsnotify.Watcher, watchedPaths []string) {
70+
func watchLoopAllEvents(w *fsnotify.Watcher, watchedPaths []string) {
5471
for {
5572
select {
5673
case err, ok := <-w.Errors:
@@ -88,11 +105,143 @@ func watchLoop(w *fsnotify.Watcher, watchedPaths []string) {
88105
}
89106
}
90107

108+
func watchLoopFirstEvent(w *fsnotify.Watcher, watchedPaths []string) {
109+
eventCache := make(map[string]time.Time)
110+
var cacheMutex sync.Mutex
111+
112+
for {
113+
select {
114+
case err, ok := <-w.Errors:
115+
if !ok {
116+
return
117+
}
118+
log.Error().Msgf("Watcher error: %s", err)
119+
120+
case event, ok := <-w.Events:
121+
if !ok {
122+
return
123+
}
124+
125+
log.Debug().Msgf("Watcher caught [%s] on [%s]", event.Op.String(), event.Name)
126+
127+
// Check if we should process this event
128+
cacheMutex.Lock()
129+
lastEventTime, exists := eventCache[event.Name]
130+
now := time.Now()
131+
if !exists || now.Sub(lastEventTime) > eventCacheDuration {
132+
// Process the event and update the cache
133+
eventCache[event.Name] = now
134+
cacheMutex.Unlock()
135+
136+
// Process the event in a goroutine to avoid blocking
137+
go processEvent(event)
138+
139+
// Clean up old cache entries
140+
go cleanEventCache(&eventCache, &cacheMutex)
141+
} else {
142+
cacheMutex.Unlock()
143+
log.Debug().Msgf("Ignored duplicate event for [%s] within cache duration", event.Name)
144+
}
145+
146+
// Re-add the watch for the file if it was removed or renamed
147+
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
148+
for _, path := range watchedPaths {
149+
if path == event.Name {
150+
// Wait a short time for the file to be recreated/renamed
151+
time.Sleep(100 * time.Millisecond)
152+
if err := w.Add(path); err != nil {
153+
log.Error().Msgf("Failed to re-add watch for %s: %s", path, err)
154+
} else {
155+
log.Debug().Msgf("Re-added watch for %s", path)
156+
}
157+
break
158+
}
159+
}
160+
}
161+
}
162+
}
163+
}
164+
165+
func watchLoopLastEvent(w *fsnotify.Watcher, watchedPaths []string) {
166+
eventCache := make(map[string]*cachedEvent)
167+
var cacheMutex sync.Mutex
168+
169+
for {
170+
select {
171+
case err, ok := <-w.Errors:
172+
if !ok {
173+
return
174+
}
175+
log.Error().Msgf("Watcher error: %s", err)
176+
177+
case event, ok := <-w.Events:
178+
if !ok {
179+
return
180+
}
181+
182+
log.Debug().Msgf("Watcher caught [%s] on [%s]", event.Op.String(), event.Name)
183+
184+
cacheMutex.Lock()
185+
if cached, exists := eventCache[event.Name]; exists {
186+
// Stop the existing timer
187+
cached.timer.Stop()
188+
// Update the cached event
189+
cached.event = event
190+
// Reset the timer
191+
cached.timer.Reset(eventCacheDuration)
192+
} else {
193+
// Create a new timer for this event
194+
timer := time.AfterFunc(eventCacheDuration, func() {
195+
processLastEvent(event.Name, &eventCache, &cacheMutex)
196+
})
197+
eventCache[event.Name] = &cachedEvent{event: event, timer: timer}
198+
}
199+
cacheMutex.Unlock()
200+
201+
// Re-add the watch for the file if it was removed or renamed
202+
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
203+
for _, path := range watchedPaths {
204+
if path == event.Name {
205+
// Wait a short time for the file to be recreated/renamed
206+
time.Sleep(100 * time.Millisecond)
207+
if err := w.Add(path); err != nil {
208+
log.Error().Msgf("Failed to re-add watch for %s: %s", path, err)
209+
} else {
210+
log.Debug().Msgf("Re-added watch for %s", path)
211+
}
212+
break
213+
}
214+
}
215+
}
216+
}
217+
}
218+
}
219+
220+
// For a configurable interval:
221+
// var eventCacheDuration time.Duration = 1000 * time.Millisecond
222+
// func SetEventCacheDuration(milliseconds int) {
223+
// eventCacheDuration = time.Duration(milliseconds) * time.Millisecond
224+
// }
225+
226+
func processLastEvent(path string, cache *map[string]*cachedEvent, mutex *sync.Mutex) {
227+
mutex.Lock()
228+
defer mutex.Unlock()
229+
230+
if cached, exists := (*cache)[path]; exists {
231+
// Process the most recent event
232+
processEvent(cached.event)
233+
// Remove the event from the cache
234+
delete(*cache, path)
235+
}
236+
}
237+
91238
func processEvent(e fsnotify.Event) {
92239
log.Debug().Str("fs", e.Name).Msg(e.String())
93240

94241
policy, ok := LoadPolicyFromCache(e.Name)
95242

243+
log.Info().Msgf("Processing event [%s] on [%s]", e.Op.String(), e.Name)
244+
96245
// Check if the watcher is targeting the directory
97246
if !ok {
98247
directoryCheck := GetDirectory(e.Name)
@@ -178,3 +327,15 @@ func FingerprintHost(hostInfo *HostInfo) (string, string, error) {
178327
fingerprint := hex.EncodeToString(hash.Sum(nil))
179328
return data, fingerprint, nil
180329
}
330+
331+
func cleanEventCache(cache *map[string]time.Time, mutex *sync.Mutex) {
332+
mutex.Lock()
333+
defer mutex.Unlock()
334+
335+
now := time.Now()
336+
for path, lastEventTime := range *cache {
337+
if now.Sub(lastEventTime) > eventCacheDuration {
338+
delete(*cache, path)
339+
}
340+
}
341+
}

cmd/worker.go

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ func processPolicyInWorker(e event.Event, policyType string) error {
2222
targetDir, _ := e.Get("targetDir").(string)
2323
filePaths, _ := e.Get("filePaths").([]string)
2424

25+
log.Debug().Str("policy", policy.ID).Msgf("Working [%s] [%s]", targetDir, filePaths)
26+
2527
switch policyType {
2628
case "scan":
2729
return ProcessScanType(policy, rgPath, targetDir, filePaths)

0 commit comments

Comments
 (0)