Skip to content

Commit 031c586

Browse files
committed
ext: ecsobserver Add Service Fetcher and mock
1 parent 375c05c commit 031c586

File tree

4 files changed

+325
-14
lines changed

4 files changed

+325
-14
lines changed

extension/observer/ecsobserver/fetcher.go

+107-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"sort"
21+
"strings"
2122

2223
"github.com/aws/aws-sdk-go/aws"
2324
"github.com/aws/aws-sdk-go/aws/request"
@@ -33,6 +34,10 @@ const (
3334
// Based on existing number from cloudwatch-agent
3435
ec2CacheSize = 2000
3536
describeContainerInstanceLimit = 100
37+
describeServiceLimit = 10
38+
// NOTE: these constants are not defined in go sdk, there are three values for deployment status.
39+
deploymentStatusActive = "ACTIVE"
40+
deploymentStatusPrimary = "PRIMARY"
3641
)
3742

3843
// ecsClient includes API required by taskFetcher.
@@ -41,6 +46,8 @@ type ecsClient interface {
4146
DescribeTasksWithContext(ctx context.Context, input *ecs.DescribeTasksInput, opts ...request.Option) (*ecs.DescribeTasksOutput, error)
4247
DescribeTaskDefinitionWithContext(ctx context.Context, input *ecs.DescribeTaskDefinitionInput, opts ...request.Option) (*ecs.DescribeTaskDefinitionOutput, error)
4348
DescribeContainerInstancesWithContext(ctx context.Context, input *ecs.DescribeContainerInstancesInput, opts ...request.Option) (*ecs.DescribeContainerInstancesOutput, error)
49+
ListServicesWithContext(ctx context.Context, input *ecs.ListServicesInput, opts ...request.Option) (*ecs.ListServicesOutput, error)
50+
DescribeServicesWithContext(ctx context.Context, input *ecs.DescribeServicesInput, opts ...request.Option) (*ecs.DescribeServicesOutput, error)
4451
}
4552

4653
// ec2Client includes API required by TaskFetcher.
@@ -49,12 +56,13 @@ type ec2Client interface {
4956
}
5057

5158
type taskFetcher struct {
52-
logger *zap.Logger
53-
ecs ecsClient
54-
ec2 ec2Client
55-
cluster string
56-
taskDefCache simplelru.LRUCache
57-
ec2Cache simplelru.LRUCache
59+
logger *zap.Logger
60+
ecs ecsClient
61+
ec2 ec2Client
62+
cluster string
63+
taskDefCache simplelru.LRUCache
64+
ec2Cache simplelru.LRUCache
65+
serviceNameFilter serviceNameFilter
5866
}
5967

6068
type taskFetcherOptions struct {
@@ -85,6 +93,11 @@ func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) {
8593
cluster: opts.Cluster,
8694
taskDefCache: taskDefCache,
8795
ec2Cache: ec2Cache,
96+
// TODO: after the service matcher PR is merged, use actual service name filter here.
97+
// For now, describe all the services
98+
serviceNameFilter: func(name string) bool {
99+
return true
100+
},
88101
}
89102
// Return early if any clients are mocked, caller should overrides all the clients when mocking.
90103
if fetcher.ecs != nil || fetcher.ec2 != nil {
@@ -105,9 +118,16 @@ func (f *taskFetcher) fetchAndDecorate(ctx context.Context) ([]*Task, error) {
105118
}
106119

107120
// EC2
108-
if err := f.attachContainerInstance(ctx, tasks); err != nil {
121+
if err = f.attachContainerInstance(ctx, tasks); err != nil {
109122
return nil, fmt.Errorf("attachContainerInstance failed: %w", err)
110123
}
124+
125+
// Services
126+
services, err := f.getAllServices(ctx)
127+
if err != nil {
128+
return nil, fmt.Errorf("getAllServices failed: %w", err)
129+
}
130+
f.attachService(tasks, services)
111131
return tasks, nil
112132
}
113133

@@ -246,7 +266,7 @@ func (f *taskFetcher) describeContainerInstances(ctx context.Context, instanceLi
246266
ContainerInstances: instanceList,
247267
})
248268
if err != nil {
249-
return fmt.Errorf("ecs.DescribeContainerInstance faile: %w", err)
269+
return fmt.Errorf("ecs.DescribeContainerInstance failed: %w", err)
250270
}
251271

252272
// Create the index to map ec2 id back to container instance id.
@@ -282,6 +302,85 @@ func (f *taskFetcher) describeContainerInstances(ctx context.Context, instanceLi
282302
return nil
283303
}
284304

305+
// serviceNameFilter decides if we should get detail info for a service, i.e. make the describe API call.
306+
type serviceNameFilter func(name string) bool
307+
308+
// getAllServices does not have cache like task definition or ec2 instances
309+
// because we need to get the deployment id to map service to task, which changes frequently.
310+
func (f *taskFetcher) getAllServices(ctx context.Context) ([]*ecs.Service, error) {
311+
svc := f.ecs
312+
cluster := aws.String(f.cluster)
313+
// List and filter out services we need to desribe.
314+
listReq := ecs.ListServicesInput{Cluster: cluster}
315+
var servicesToDescribe []*string
316+
for {
317+
res, err := svc.ListServicesWithContext(ctx, &listReq)
318+
if err != nil {
319+
return nil, err
320+
}
321+
for _, arn := range res.ServiceArns {
322+
segs := strings.Split(aws.StringValue(arn), "/")
323+
name := segs[len(segs)-1]
324+
if f.serviceNameFilter(name) {
325+
servicesToDescribe = append(servicesToDescribe, arn)
326+
}
327+
}
328+
if res.NextToken == nil {
329+
break
330+
}
331+
listReq.NextToken = res.NextToken
332+
}
333+
334+
// DescribeServices size limit is 10 so we need to do paging on client side.
335+
var services []*ecs.Service
336+
for i := 0; i < len(servicesToDescribe); i += describeServiceLimit {
337+
end := minInt(i+describeServiceLimit, len(servicesToDescribe))
338+
desc := &ecs.DescribeServicesInput{
339+
Cluster: cluster,
340+
Services: servicesToDescribe[i:end],
341+
}
342+
res, err := svc.DescribeServicesWithContext(ctx, desc)
343+
if err != nil {
344+
return nil, fmt.Errorf("ecs.DescribeServices failed %w", err)
345+
}
346+
services = append(services, res.Services...)
347+
}
348+
return services, nil
349+
}
350+
351+
// attachService map service to task using deployment id.
352+
// Each service can have multiple deployment and each task keep track of the deployment in task.StartedBy.
353+
func (f *taskFetcher) attachService(tasks []*Task, services []*ecs.Service) {
354+
// Map deployment ID to service name
355+
idToService := make(map[string]*ecs.Service)
356+
for _, svc := range services {
357+
for _, deployment := range svc.Deployments {
358+
status := aws.StringValue(deployment.Status)
359+
if status == deploymentStatusActive || status == deploymentStatusPrimary {
360+
idToService[aws.StringValue(deployment.Id)] = svc
361+
break
362+
}
363+
}
364+
}
365+
366+
// Attach service to task
367+
for _, t := range tasks {
368+
// Task is created using RunTask i.e. not manged by a service.
369+
if t.Task.StartedBy == nil {
370+
continue
371+
}
372+
deploymentID := aws.StringValue(t.Task.StartedBy)
373+
svc := idToService[deploymentID]
374+
// Service not found happen a lot because we only fetch services defined in ServiceConfig.
375+
// However, we fetch all the tasks, which could be started by other services no mentioned in config
376+
// or started using RunTasks API directly.
377+
if svc == nil {
378+
continue
379+
}
380+
t.Service = svc
381+
}
382+
}
383+
285384
// Util Start
286385

287386
func sortStringPointers(ps []*string) {

extension/observer/ecsobserver/fetcher_test.go

+87-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ func TestFetcher_FetchAndDecorate(t *testing.T) {
3838
ec2Override: c,
3939
})
4040
require.NoError(t, err)
41-
// Create 1 task def and 10 tasks, 8 running on ec2, first 3 runs on fargate
42-
// TODO: they will be changed to include service etc.
41+
// Create 1 task def, 2 services and 10 tasks, 8 running on ec2, first 3 runs on fargate
4342
nTasks := 11
4443
nInstances := 2
4544
nFargateInstances := 3
@@ -48,9 +47,11 @@ func TestFetcher_FetchAndDecorate(t *testing.T) {
4847
ins := i % nInstances
4948
if i < nFargateInstances {
5049
task.LaunchType = aws.String(ecs.LaunchTypeFargate)
50+
task.StartedBy = aws.String("deploy0")
5151
} else {
5252
task.LaunchType = aws.String(ecs.LaunchTypeEc2)
5353
task.ContainerInstanceArn = aws.String(fmt.Sprintf("ci%d", ins))
54+
task.StartedBy = aws.String("deploy1")
5455
}
5556
task.TaskDefinitionArn = aws.String("d0:1")
5657
}))
@@ -59,11 +60,32 @@ func TestFetcher_FetchAndDecorate(t *testing.T) {
5960
ci.Ec2InstanceId = aws.String(fmt.Sprintf("i-%d", i))
6061
}))
6162
c.SetEc2Instances(ecsmock.GenEc2Instances("i-", nInstances, nil))
63+
// Service
64+
c.SetServices(ecsmock.GenServices("s", 2, func(i int, s *ecs.Service) {
65+
if i == 0 {
66+
s.LaunchType = aws.String(ecs.LaunchTypeFargate)
67+
s.Deployments = []*ecs.Deployment{
68+
{
69+
Status: aws.String("ACTIVE"),
70+
Id: aws.String("deploy0"),
71+
},
72+
}
73+
} else {
74+
s.LaunchType = aws.String(ecs.LaunchTypeEc2)
75+
s.Deployments = []*ecs.Deployment{
76+
{
77+
Status: aws.String("ACTIVE"),
78+
Id: aws.String("deploy1"),
79+
},
80+
}
81+
}
82+
}))
6283

6384
ctx := context.Background()
6485
tasks, err := f.fetchAndDecorate(ctx)
6586
require.NoError(t, err)
6687
assert.Equal(t, nTasks, len(tasks))
88+
assert.Equal(t, "s0", aws.StringValue(tasks[0].Service.ServiceArn))
6789
}
6890

6991
func TestFetcher_GetAllTasks(t *testing.T) {
@@ -220,3 +242,66 @@ func TestFetcher_AttachContainerInstance(t *testing.T) {
220242
assert.Equal(t, "i-1", aws.StringValue(tasks[nFargateInstances].EC2.InstanceId))
221243
})
222244
}
245+
246+
func TestFetcher_GetAllServices(t *testing.T) {
247+
c := ecsmock.NewCluster()
248+
f, err := newTaskFetcher(taskFetcherOptions{
249+
Logger: zap.NewExample(),
250+
Cluster: "not used",
251+
Region: "not used",
252+
ecsOverride: c,
253+
})
254+
require.NoError(t, err)
255+
const nServices = 101
256+
c.SetServices(ecsmock.GenServices("s", nServices, nil))
257+
ctx := context.Background()
258+
services, err := f.getAllServices(ctx)
259+
require.NoError(t, err)
260+
assert.Equal(t, nServices, len(services))
261+
}
262+
263+
func TestFetcher_AttachService(t *testing.T) {
264+
c := ecsmock.NewCluster()
265+
f, err := newTaskFetcher(taskFetcherOptions{
266+
Logger: zap.NewExample(),
267+
Cluster: "not used",
268+
Region: "not used",
269+
ecsOverride: c,
270+
})
271+
require.NoError(t, err)
272+
const nServices = 10
273+
c.SetServices(ecsmock.GenServices("s", nServices, func(i int, s *ecs.Service) {
274+
s.Deployments = []*ecs.Deployment{
275+
{
276+
Status: aws.String("ACTIVE"),
277+
Id: aws.String(fmt.Sprintf("deploy%d", i)),
278+
},
279+
}
280+
}))
281+
c.SetTaskDefinitions(ecsmock.GenTaskDefinitions("def", nServices, 1, nil))
282+
const nTasks = 100
283+
c.SetTasks(ecsmock.GenTasks("t", nTasks, func(i int, task *ecs.Task) {
284+
// Last task is launched manually w/o service
285+
if i == nTasks-1 {
286+
return
287+
}
288+
deployID := i % nServices
289+
task.TaskDefinitionArn = aws.String(fmt.Sprintf("def%d:1", deployID))
290+
task.StartedBy = aws.String(fmt.Sprintf("deploy%d", deployID))
291+
292+
}))
293+
294+
ctx := context.Background()
295+
rawTasks, err := f.getAllTasks(ctx)
296+
require.NoError(t, err)
297+
tasks, err := f.attachTaskDefinition(ctx, rawTasks)
298+
require.NoError(t, err)
299+
services, err := f.getAllServices(ctx)
300+
require.NoError(t, err)
301+
f.attachService(tasks, services)
302+
303+
// Just pick one
304+
assert.Equal(t, "s0", aws.StringValue(tasks[0].Service.ServiceArn))
305+
assert.NotNil(t, tasks[nTasks-2].Service)
306+
assert.Nil(t, tasks[nTasks-1].Service)
307+
}

extension/observer/ecsobserver/internal/ecsmock/service.go

+68-2
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,18 @@ type Cluster struct {
5959
containerInstanceList []*ecs.ContainerInstance
6060
ec2Map map[string]*ec2.Instance // key is instance id
6161
ec2List []*ec2.Instance
62+
serviceMap map[string]*ecs.Service
63+
serviceList []*ecs.Service
6264
limit PageLimit
6365
stats ClusterStats
6466
}
6567

6668
// NewCluster creates a mock ECS cluster with default limits.
6769
func NewCluster() *Cluster {
6870
return &Cluster{
69-
taskMap: make(map[string]*ecs.Task),
70-
limit: DefaultPageLimit(),
71+
// NOTE: we don't set the maps by design, they should be injected and API calls
72+
// without setting up data should just panic.
73+
limit: DefaultPageLimit(),
7174
}
7275
}
7376

@@ -196,6 +199,45 @@ func (c *Cluster) DescribeInstancesWithContext(_ context.Context, input *ec2.Des
196199
}, nil
197200
}
198201

202+
func (c *Cluster) ListServicesWithContext(_ context.Context, input *ecs.ListServicesInput, _ ...request.Option) (*ecs.ListServicesOutput, error) {
203+
page, err := getPage(pageInput{
204+
nextToken: input.NextToken,
205+
size: len(c.serviceList),
206+
limit: c.limit.ListServiceOutput,
207+
})
208+
if err != nil {
209+
return nil, err
210+
}
211+
res := c.serviceList[page.start:page.end]
212+
return &ecs.ListServicesOutput{
213+
ServiceArns: getArns(res, func(i int) *string {
214+
return res[i].ServiceArn
215+
}),
216+
NextToken: page.nextToken,
217+
}, nil
218+
}
219+
220+
func (c *Cluster) DescribeServicesWithContext(_ context.Context, input *ecs.DescribeServicesInput, _ ...request.Option) (*ecs.DescribeServicesOutput, error) {
221+
var (
222+
failures []*ecs.Failure
223+
services []*ecs.Service
224+
)
225+
for i, serviceArn := range input.Services {
226+
arn := aws.StringValue(serviceArn)
227+
svc, ok := c.serviceMap[arn]
228+
if !ok {
229+
failures = append(failures, &ecs.Failure{
230+
Arn: serviceArn,
231+
Detail: aws.String(fmt.Sprintf("service not found index %d arn %s total services %d", i, arn, len(c.serviceMap))),
232+
Reason: aws.String("service not found"),
233+
})
234+
continue
235+
}
236+
services = append(services, svc)
237+
}
238+
return &ecs.DescribeServicesOutput{Failures: failures, Services: services}, nil
239+
}
240+
199241
// API End
200242

201243
// Hook Start
@@ -240,6 +282,16 @@ func (c *Cluster) SetEc2Instances(instances []*ec2.Instance) {
240282
c.ec2List = instances
241283
}
242284

285+
// SetServices updates the list and map.
286+
func (c *Cluster) SetServices(services []*ecs.Service) {
287+
m := make(map[string]*ecs.Service, len(services))
288+
for _, s := range services {
289+
m[aws.StringValue(s.ServiceArn)] = s
290+
}
291+
c.serviceMap = m
292+
c.serviceList = services
293+
}
294+
243295
// Hook End
244296

245297
// Generator Start
@@ -303,6 +355,20 @@ func GenEc2Instances(idPrefix string, count int, modifier func(i int, ins *ec2.I
303355
return instances
304356
}
305357

358+
func GenServices(arnPrefix string, count int, modifier func(i int, s *ecs.Service)) []*ecs.Service {
359+
var services []*ecs.Service
360+
for i := 0; i < count; i++ {
361+
svc := &ecs.Service{
362+
ServiceArn: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)),
363+
}
364+
if modifier != nil {
365+
modifier(i, svc)
366+
}
367+
services = append(services, svc)
368+
}
369+
return services
370+
}
371+
306372
// Generator End
307373

308374
// pagination Start

0 commit comments

Comments
 (0)