Skip to content

Migrate from cgroup profiling to system-wide profiling #627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ Flags:
--pod-label-selector=STRING
Label selector to control which Kubernetes
Pods to select.
--cgroups=CGROUPS,... Cgroups to profile on this node.
--systemd-units=SYSTEMD-UNITS,...
[deprecated, use --cgroups instead] systemd
units to profile on this node.
--temp-dir="" (Deprecated) Temporary directory path to use
for processing object files.
--socket-path=STRING The filesystem path to the container runtimes
Expand Down
186 changes: 30 additions & 156 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@ package main
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/pprof"
"net/url"
"os"
"sort"
"strings"
"time"

Expand All @@ -39,8 +36,6 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -49,8 +44,10 @@ import (
"github.com/parca-dev/parca-agent/pkg/buildinfo"
"github.com/parca-dev/parca-agent/pkg/debuginfo"
"github.com/parca-dev/parca-agent/pkg/discovery"
"github.com/parca-dev/parca-agent/pkg/ksym"
"github.com/parca-dev/parca-agent/pkg/logger"
"github.com/parca-dev/parca-agent/pkg/target"
"github.com/parca-dev/parca-agent/pkg/objectfile"
"github.com/parca-dev/parca-agent/pkg/profiler"
"github.com/parca-dev/parca-agent/pkg/template"
)

Expand All @@ -75,9 +72,6 @@ type flags struct {
SamplingRatio float64 `kong:"help='Sampling ratio to control how many of the discovered targets to profile. Defaults to 1.0, which is all.',default='1.0'"`
Kubernetes bool `kong:"help='Discover containers running on this node to profile automatically.',default='true'"`
PodLabelSelector string `kong:"help='Label selector to control which Kubernetes Pods to select.'"`
Cgroups []string `kong:"help='Cgroups to profile on this node.'"`
// SystemdUnits is deprecated and will be eventually removed, please use the Cgroups flag instead.
SystemdUnits []string `kong:"help='[deprecated, use --cgroups instead] systemd units to profile on this node.'"`
// TempDir is deprecated and will be eventually removed.
TempDir string `kong:"help='(Deprecated) Temporary directory path to use for processing object files.',default=''"`
SocketPath string `kong:"help='The filesystem path to the container runtimes socket. Leave this empty to use the defaults.'"`
Expand Down Expand Up @@ -137,7 +131,6 @@ func main() {
level.Warn(logger).Log("msg", "--temp-dir is deprecated and will be removed in a future release.")
}

mux := http.NewServeMux()
reg := prometheus.NewRegistry()
reg.MustRegister(
collectors.NewBuildInfoCollector(),
Expand Down Expand Up @@ -179,29 +172,19 @@ func main() {
))
}

if len(flags.Cgroups) > 0 {
configs = append(configs, discovery.NewSystemdConfig(
flags.Cgroups,
flags.CgroupPath,
))
}

// TODO(javierhonduco): This is deprecated, remove few versions from now.
if len(flags.SystemdUnits) > 0 {
configs = append(configs, discovery.NewSystemdConfig(
flags.SystemdUnits,
flags.SystemdCgroupPath,
))
}

tm := target.NewManager(
logger, reg,
profileListener, debugInfoClient,
pp := profiler.NewProfilerPool(
logger,
reg,
ksym.NewKsymCache(logger),
objectfile.NewCache(5),
profileListener,
debugInfoClient,
flags.ProfilingDuration,
externalLabels(flags.ExternalLabel, flags.Node),
flags.SamplingRatio,
)

mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand All @@ -214,63 +197,18 @@ func main() {
}
if r.URL.Path == "/" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
activeProfilers := tm.ActiveProfilers()

profilers := pp.Profilers()
statusPage := template.StatusPage{}

for _, profilerSet := range activeProfilers {
for _, profiler := range profilerSet {
profileType := ""
labelSet := labels.Labels{}

for name, value := range profiler.Labels() {
if name == "__name__" {
profileType = string(value)
}
if name != "__name__" {
labelSet = append(labelSet,
labels.Label{Name: string(name), Value: string(value)})
}
}

sort.Sort(labelSet)

q := url.Values{}
q.Add("debug", "1")
q.Add("query", labelSet.String())

statusPage.ActiveProfilers = append(statusPage.ActiveProfilers, template.ActiveProfiler{
Type: profileType,
Labels: labelSet,
Interval: flags.ProfilingDuration,
NextStartedAgo: time.Since(profiler.NextProfileStartedAt()),
Error: profiler.LastError(),
Link: fmt.Sprintf("/query?%s", q.Encode()),
})
}
}

sort.Slice(statusPage.ActiveProfilers, func(j, k int) bool {
a := statusPage.ActiveProfilers[j].Labels
b := statusPage.ActiveProfilers[k].Labels

l := len(a)
if len(b) < l {
l = len(b)
}

for i := 0; i < l; i++ {
if a[i].Name != b[i].Name {
return a[i].Name < b[i].Name
}
if a[i].Value != b[i].Value {
return a[i].Value < b[i].Value
}
}
// If all labels so far were in common, the set with fewer labels comes first.
return len(a)-len(b) < 0
})
for name, profiler := range profilers {
statusPage.ActiveProfilers = append(statusPage.ActiveProfilers, template.ActiveProfiler{
Name: name,
Interval: flags.ProfilingDuration,
NextStartedAgo: time.Since(profiler.NextProfileStartedAt()),
Error: profiler.LastError(),
})

}
err := template.StatusPageTemplate.Execute(w, statusPage)
if err != nil {
http.Error(w,
Expand All @@ -282,66 +220,6 @@ func main() {
return
}

if strings.HasPrefix(r.URL.Path, "/query") {
ctx := r.Context()
query := r.URL.Query().Get("query")
matchers, err := parser.ParseMetricSelector(query)
if err != nil {
http.Error(w,
`query incorrectly formatted, expecting selector in form of: {name1="value1",name2="value2"}`,
http.StatusBadRequest,
)
return
}

// We profile every ProfilingDuration so leaving 1s wiggle room. If after
// ProfilingDuration+1s no profile has matched, then there is very likely no
// profiler running that matches the label-set.
timeout := flags.ProfilingDuration + time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

profile, err := profileListener.NextMatchingProfile(ctx, matchers)
if profile == nil || errors.Is(err, context.Canceled) {
http.Error(w, fmt.Sprintf(
"No profile taken in the last %s that matches the requested label-matchers query. "+
"Profiles are taken every %s so either the profiler matching the label-set has stopped profiling, "+
"or the label-set was incorrect.",
timeout, flags.ProfilingDuration,
), http.StatusNotFound)
return
}
if err != nil {
http.Error(w, "Unexpected error occurred: "+err.Error(), http.StatusInternalServerError)
return
}

v := r.URL.Query().Get("debug")
if v == "1" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
q := url.Values{}
q.Add("query", query)

fmt.Fprintf(
w,
"<p><a title='May take up %s to retrieve' href='/query?%s'>Download Next Pprof</a></p>\n",
flags.ProfilingDuration,
q.Encode(),
)
fmt.Fprint(w, "<code><pre>\n")
fmt.Fprint(w, profile.String())
fmt.Fprint(w, "\n</pre></code>")
return
}

w.Header().Set("Content-Type", "application/vnd.google.protobuf+gzip")
w.Header().Set("Content-Disposition", "attachment;filename=profile.pb.gz")
err = profile.Write(w)
if err != nil {
level.Error(logger).Log("msg", "failed to write profile", "err", err)
}
return
}
http.NotFound(w, r)
})

Expand All @@ -364,14 +242,6 @@ func main() {
reg := prometheus.NewRegistry()
m = discovery.NewManager(logger, reg)
var err error
if len(flags.Cgroups) > 0 || len(flags.SystemdUnits) > 0 {
err = m.ApplyConfig(ctx, map[string]discovery.Configs{"systemd": configs})

if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}

if flags.Kubernetes {
err = m.ApplyConfig(ctx, map[string]discovery.Configs{"pod": configs})
Expand All @@ -390,14 +260,18 @@ func main() {
})
}

// Run group for target manager
// Add profiler.
profiler := pp.AddProfiler(ctx, profiler.NewCPUProfiler, func() map[int]model.LabelSet {
return m.ProcessLabels()
})

// Run group for profiler.
{
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
level.Debug(logger).Log("msg", "starting target manager")
return tm.Run(ctx, m.SyncCh())
}, func(error) {
cancel()
return profiler.Run(ctx)
}, func(err error) {
profiler.Stop()
level.Error(logger).Log("msg", "profiler ended with", "error", err, "profilerName", profiler.Name())
})
}

Expand Down
5 changes: 0 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/aquasecurity/libbpfgo v0.3.0-libbpf-0.8.0
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cespare/xxhash/v2 v2.1.2
github.com/containerd/cgroups v1.0.4
github.com/docker/docker v20.10.17+incompatible
github.com/dustin/go-humanize v1.0.0
github.com/go-kit/log v0.2.1
Expand Down Expand Up @@ -69,7 +68,6 @@ require (
github.com/baidubce/bce-sdk-go v0.9.111 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
Expand All @@ -83,7 +81,6 @@ require (
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down Expand Up @@ -126,8 +123,6 @@ require (
github.com/tencentyun/cos-go-sdk-v5 v0.7.34 // indirect
github.com/thanos-io/objstore v0.0.0-20220324141029-c4f11442aa33 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
Expand Down
Loading