diff --git a/NOVELTY.md b/NOVELTY.md new file mode 100644 index 0000000000..9150573539 --- /dev/null +++ b/NOVELTY.md @@ -0,0 +1,12 @@ +# Period A: Fairly stable consumption +https://ops.grafana-ops.net/a/grafana-pyroscope-app/explore?searchText=&panelType=time-series&layout=grid&hideNoData=off&explorationType=flame-graph&var-serviceName=profiles-prod-001%2Fingester&var-profileMetricId=process_cpu:cpu:nanoseconds:cpu:nanoseconds&var-spanSelector=undefined&var-dataSource=grafanacloud-profiles&var-filters=pod%7C%3D%7Cpyroscope-ingester-0&var-filtersBaseline=&var-filtersComparison=&var-groupBy=&maxNodes=16384&from=2025-04-08T13:20:56.014Z&to=2025-04-08T14:04:10.198Z + +$ go run ./cmd/profilecli query novelty --from 2025-04-08T13:20:56Z --to 2025-04-08T14:04:00Z --query '{namespace="profiles-prod-001", pod="pyroscope-ingester-0"}' + + +# Period B: Starts to flush + +https://ops.grafana-ops.net/a/grafana-pyroscope-app/explore?searchText=&panelType=time-series&layout=grid&hideNoData=off&explorationType=flame-graph&var-serviceName=profiles-prod-001%2Fingester&var-profileMetricId=process_cpu:cpu:nanoseconds:cpu:nanoseconds&var-spanSelector=undefined&var-dataSource=grafanacloud-profiles&var-filters=pod%7C%3D%7Cpyroscope-ingester-0&var-filtersBaseline=&var-filtersComparison=&var-groupBy=&maxNodes=16384&from=2025-04-08T14:04:35.126Z&to=2025-04-08T14:10:54.684Z + +$ go run ./cmd/profilecli query novelty --from 2025-04-08T14:04:34Z --to 2025-04-08T14:10.54:00Z --query '{namespace="profiles-prod-001", pod="pyroscope-ingester-0"}' + diff --git a/cmd/profilecli/main.go b/cmd/profilecli/main.go index d0fe5d3e5a..8a4a19ab67 100644 --- a/cmd/profilecli/main.go +++ b/cmd/profilecli/main.go @@ -82,6 +82,9 @@ func main() { queryLabelValuesCardinalityCmd := queryCmd.Command("label-values-cardinality", "Request label values cardinality.") queryLabelValuesCardinalityParams := addQueryLabelValuesCardinalityParams(queryLabelValuesCardinalityCmd) + queryNoveltyCmd := queryCmd.Command("novelty", "Decide how much changed particular profile is compared to the ones before.") + queryNoveltyParams := addQueryNoveltyParams(queryNoveltyCmd) + queryTracerCmd := app.Command("query-tracer", "Analyze query traces.") queryTracerParams := addQueryTracerParams(queryTracerCmd) @@ -152,6 +155,11 @@ func main() { os.Exit(checkError(err)) } + case queryNoveltyCmd.FullCommand(): + if err := queryNovelty(ctx, queryNoveltyParams); err != nil { + os.Exit(checkError(err)) + } + case queryTracerCmd.FullCommand(): if err := queryTracer(ctx, queryTracerParams); err != nil { os.Exit(checkError(err)) diff --git a/cmd/profilecli/query-novelty.go b/cmd/profilecli/query-novelty.go new file mode 100644 index 0000000000..a1d9839e07 --- /dev/null +++ b/cmd/profilecli/query-novelty.go @@ -0,0 +1,148 @@ +package main + +import ( + "context" + "sort" + "strings" + "time" + + "connectrpc.com/connect" + "github.com/dustin/go-humanize" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" + "github.com/grafana/pyroscope/pkg/pprof/novelty" +) + +type queryNoveltyParams struct { + *queryParams + TopN uint64 // TOPk stacktraces + StepSize time.Duration // Step size for the novelty query + ProfileType string + Dimension string + MergeThreshold float64 +} + +func addQueryNoveltyParams(queryCmd commander) *queryNoveltyParams { + params := new(queryNoveltyParams) + params.queryParams = addQueryParams(queryCmd) + queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(¶ms.ProfileType) + queryCmd.Flag("step-size", "Show the top N stacktraces").Default("15s").DurationVar(¶ms.StepSize) + queryCmd.Flag("top-n", "Show the top N").Default("20").Uint64Var(¶ms.TopN) + queryCmd.Flag("dimension", "Aggregate stacktrace-self or function-self").Default("function-self").StringVar(¶ms.Dimension) + queryCmd.Flag("merge-threshold", "Threshold when to consider profiles simliar enough").Default("0.10").Float64Var(¶ms.MergeThreshold) + return params +} + +func queryNovelty(ctx context.Context, params *queryNoveltyParams) (err error) { + from, to, err := params.parseFromTo() + if err != nil { + return err + } + + samples := novelty.NewSamples(0, params.MergeThreshold) + + pos := from + + for { + pos = pos.Add(params.StepSize) + from := pos.Add(-params.StepSize) + + if pos.After(to) { + break + } + + level.Info(logger).Log("msg", "query profile", "url", params.URL, "from", from, "to", to) + + req := &querierv1.SelectMergeProfileRequest{ + ProfileTypeID: params.ProfileType, + Start: from.UnixMilli(), + End: pos.UnixMilli(), + LabelSelector: params.Query, + } + + qc := params.phlareClient.queryClient() + resp, err := qc.SelectMergeProfile(ctx, connect.NewRequest(req)) + if err != nil { + return errors.Wrap(err, "failed to query") + } + var stacks []string + var values []int64 + profileTypeIdx := 0 + + if params.Dimension == "stacktrace-self" { + // sort the samples by the first type + sort.Slice(resp.Msg.Sample, func(i, j int) bool { + return resp.Msg.Sample[i].Value[profileTypeIdx] > resp.Msg.Sample[j].Value[profileTypeIdx] + }) + // append the first N samples to the noveltySamples + end := len(resp.Msg.Sample) + if end > int(params.TopN) { + end = int(params.TopN) + } + stacks = make([]string, 0, end) + values = make([]int64, 0, end) + for _, sample := range resp.Msg.Sample[:end] { + stackParts := make([]string, len(sample.LocationId)) + for idx := range sample.LocationId { + loc := resp.Msg.Location[sample.LocationId[idx]-1] + if len(loc.Line) == 0 { + panic("no line") + } + functionID := loc.Line[0].FunctionId + function := resp.Msg.Function[functionID-1] + stackParts[idx] = resp.Msg.StringTable[function.Name] + } + stacks = append(stacks, strings.Join(stackParts, "|")) + values = append(values, sample.Value[profileTypeIdx]) + } + } else if params.Dimension == "function-self" { + values = make([]int64, len(resp.Msg.Function)) + + // add the leaf function to the values + for _, sample := range resp.Msg.Sample { + if len(sample.LocationId) == 0 { + continue + } + + leafLoc := resp.Msg.Location[sample.LocationId[0]-1] + functionID := leafLoc.Line[0].FunctionId + values[functionID-1] += sample.Value[profileTypeIdx] + } + + // sort the functions and values by the biggest + sort.Slice(resp.Msg.Function, func(i, j int) bool { + return values[i] > values[j] + }) + sort.Slice(values, func(i, j int) bool { + return values[i] > values[j] + }) + + // append the first N samples to the noveltySamples + end := len(resp.Msg.Function) + if end > int(params.TopN) { + end = int(params.TopN) + } + + // resovle to func names + stacks = make([]string, end) + for i := range stacks { + stacks[i] = resp.Msg.StringTable[resp.Msg.Function[i].Name] + } + values = values[:end] + } else { + return errors.New("invalid dimension: " + params.Dimension) + } + + for i, stack := range stacks { + level.Debug(logger).Log("stack", stack, "value", humanize.FormatInteger("", int(values[i]))) + } + + noveltyScore := samples.Add(stacks, values) + + level.Info(logger).Log("novelty score", noveltyScore) + } + + return nil +} diff --git a/pkg/pprof/novelty/novelty.go b/pkg/pprof/novelty/novelty.go new file mode 100644 index 0000000000..242d22b38f --- /dev/null +++ b/pkg/pprof/novelty/novelty.go @@ -0,0 +1,198 @@ +package novelty + +import ( + "sort" +) + +type Sample struct { + stackResolved int + sample int64 +} + +func abs(a, b int64) (match, miss int64) { + if a > b { + return b, a - b + } + return a, b - a +} + +func getNovelty(profile1 profile, total1 int64, profile2 profile, total2 int64) float64 { + factor := float64(float64(total2) / float64(total1)) + scale := func(sample int64) int64 { + return int64(float64(sample) * factor) + } + + idx1 := 0 + idx2 := 0 + + var matches, misses int64 + for { + // if we have reached the end of both profiles, we are done + if idx1 >= len(profile1) && idx2 >= len(profile2) { + break + } + + // end of one of the profiles, we can break after adding the remaining samples + if idx1 >= len(profile1) { + misses += profile2[idx2].sample + idx2++ + continue + } + if idx2 >= len(profile2) { + misses += scale(profile1[idx1].sample) + idx1++ + continue + } + + // check for match + if profile1[idx1].stackResolved == profile2[idx2].stackResolved { + match, miss := abs(scale(profile1[idx1].sample), profile2[idx2].sample) + matches += match + misses += miss + idx1++ + idx2++ + continue + } + + // if profile1 is less than profile2, we need to move to the next profile1 + if profile1[idx1].stackResolved < profile2[idx2].stackResolved { + misses += scale(profile1[idx1].sample) + idx1++ + continue + } + + // if profile2 is less than profile1, we need to move to the next profile2 + if profile1[idx1].stackResolved > profile2[idx2].stackResolved { + misses += profile2[idx2].sample + idx2++ + continue + } + } + + return float64(matches) / float64(matches+misses) + +} + +func mergeProfiles(profile1, profile2 profile) (merged profile, total int64) { + merged = make(profile, 0, len(profile1)) + total = 0 + + idx1 := 0 + idx2 := 0 + + for { + // if we have reached the end of both profiles, we are done + if idx1 >= len(profile1) && idx2 >= len(profile2) { + break + } + + if idx1 >= len(profile1) { + left := profile2[idx2:] + merged = append(merged, left...) + for _, sample := range left { + total += sample.sample + } + break + } + if idx2 >= len(profile2) { + left := profile1[idx1:] + merged = append(merged, left...) + for _, sample := range left { + total += sample.sample + } + break + } + + // check for match + if profile1[idx1].stackResolved == profile2[idx2].stackResolved { + value := profile1[idx1].sample + profile2[idx2].sample + merged = append(merged, Sample{ + stackResolved: profile1[idx1].stackResolved, + sample: value, + }) + total += value + idx1++ + idx2++ + continue + } + + // if profile1 is less than profile2, we attach that one + if profile1[idx1].stackResolved < profile2[idx2].stackResolved { + merged = append(merged, profile1[idx1]) + total += profile1[idx1].sample + idx1++ + continue + } + + // if profile2 is less than profile1, we attach that one + if profile1[idx1].stackResolved > profile2[idx2].stackResolved { + merged = append(merged, profile2[idx2]) + total += profile2[idx2].sample + idx2++ + continue + } + } + + return merged, total +} + +type profile []Sample + +type Samples struct { + threshold float64 // when is something considered a match + + stackMap map[string]int + stackTable []string + + profiles []profile + totals []int64 +} + +func NewSamples(size int, threshold float64) *Samples { + return &Samples{ + stackMap: make(map[string]int, size), + stackTable: make([]string, 0, size), + threshold: threshold, + } +} + +func (n *Samples) Add(stack []string, value []int64) float64 { + profile := make(profile, len(stack)) + total := int64(0) + for idx := range stack { + pos, ok := n.stackMap[stack[idx]] + if !ok { + pos = len(n.stackTable) + n.stackMap[stack[idx]] = pos + n.stackTable = append(n.stackTable, stack[idx]) + } + profile[idx].stackResolved = pos + profile[idx].sample = value[idx] + total += value[idx] + } + + // sort the profile by stackResolved + sort.Slice(profile, func(i, j int) bool { + return profile[i].stackResolved < profile[j].stackResolved + }) + + maxNovelty := 0.0 + maxNoveltyIdx := -1 + for idx, p := range n.profiles { + novelty := getNovelty(p, n.totals[idx], profile, total) + if novelty > maxNovelty { + maxNovelty = novelty + maxNoveltyIdx = idx + } + } + + if maxNovelty >= 0 && maxNovelty > n.threshold { + n.profiles[maxNoveltyIdx], n.totals[maxNoveltyIdx] = mergeProfiles(n.profiles[maxNoveltyIdx], profile) + return maxNovelty + } + + // add a new sub profile + n.profiles = append(n.profiles, profile) + n.totals = append(n.totals, total) + return maxNovelty +} diff --git a/pkg/pprof/novelty/novelty_test.go b/pkg/pprof/novelty/novelty_test.go new file mode 100644 index 0000000000..1d196ce47a --- /dev/null +++ b/pkg/pprof/novelty/novelty_test.go @@ -0,0 +1,100 @@ +package novelty + +import ( + "testing" +) + +func TestSamplesAdd(t *testing.T) { + t.Run("empty samples", func(t *testing.T) { + s := NewSamples(0, 0.85) + + stack := []string{"func1", "func2"} + values := []int64{10, 20} + novelty := s.Add(stack, values) + + if novelty != 0.0 { + t.Errorf("expected novelty 0.0 for first sample, got %f", novelty) + } + if len(s.profiles) != 1 { + t.Errorf("expected 1 profile, got %d", len(s.profiles)) + } + if len(s.totals) != 1 { + t.Errorf("expected 1 total, got %d", len(s.totals)) + } + if s.totals[0] != 30 { + t.Errorf("expected total 30, got %d", s.totals[0]) + } + }) + + t.Run("equal samples merge", func(t *testing.T) { + s := NewSamples(0, 0.85) + + // First sample + stack1 := []string{"func1", "func2"} + values1 := []int64{10, 20} + s.Add(stack1, values1) + + // Second similar sample + stack2 := []string{"func1", "func2"} + values2 := []int64{15, 30} + novelty2 := s.Add(stack2, values2) + + if novelty2 < 1.0 { + t.Errorf("expected high novelty for similar samples, got %f", novelty2) + } + if len(s.profiles) != 1 { + t.Errorf("expected profiles to merge, got %d profiles", len(s.profiles)) + } + if s.totals[0] != 75 { + t.Errorf("expected merged total 70, got %d", s.totals[0]) + } + }) + + t.Run("similar samples merge", func(t *testing.T) { + s := NewSamples(0, 0.85) + + // First sample + stack1 := []string{"func1", "func2", "func3"} + values1 := []int64{10, 20, 2} + s.Add(stack1, values1) + + // Second similar sample + stack2 := []string{"func1", "func2"} + values2 := []int64{15, 30} + novelty2 := s.Add(stack2, values2) + + if novelty2 < 0.8 { + t.Errorf("expected high novelty for similar samples, got %f", novelty2) + } + if len(s.profiles) != 1 { + t.Errorf("expected profiles to merge, got %d profiles", len(s.profiles)) + } + if s.totals[0] != 77 { + t.Errorf("expected merged total 70, got %d", s.totals[0]) + } + }) + + t.Run("different samples create new profile", func(t *testing.T) { + s := NewSamples(0, 0.85) + + // First sample + stack1 := []string{"func1", "func2"} + values1 := []int64{10, 20} + s.Add(stack1, values1) + + // Second different sample + stack2 := []string{"func3", "func4"} + values2 := []int64{30, 40} + novelty2 := s.Add(stack2, values2) + + if novelty2 > 0.0 { + t.Errorf("expected low novelty for different samples, got %f", novelty2) + } + if len(s.profiles) != 2 { + t.Errorf("expected 2 profiles, got %d", len(s.profiles)) + } + if len(s.totals) != 2 { + t.Errorf("expected 2 totals, got %d", len(s.totals)) + } + }) +}