Skip to content

POC: Compare profiles within a series, to find "novelty" #4087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions NOVELTY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Period A: Fairly stable consumption
https://ops.grafana-ops.net/a/grafana-pyroscope-app/explore?searchText=&panelType=time-series&layout=grid&hideNoData=off&explorationType=flame-graph&var-serviceName=profiles-prod-001%2Fingester&var-profileMetricId=process_cpu:cpu:nanoseconds:cpu:nanoseconds&var-spanSelector=undefined&var-dataSource=grafanacloud-profiles&var-filters=pod%7C%3D%7Cpyroscope-ingester-0&var-filtersBaseline=&var-filtersComparison=&var-groupBy=&maxNodes=16384&from=2025-04-08T13:20:56.014Z&to=2025-04-08T14:04:10.198Z

$ go run ./cmd/profilecli query novelty --from 2025-04-08T13:20:56Z --to 2025-04-08T14:04:00Z --query '{namespace="profiles-prod-001", pod="pyroscope-ingester-0"}'


# Period B: Starts to flush

https://ops.grafana-ops.net/a/grafana-pyroscope-app/explore?searchText=&panelType=time-series&layout=grid&hideNoData=off&explorationType=flame-graph&var-serviceName=profiles-prod-001%2Fingester&var-profileMetricId=process_cpu:cpu:nanoseconds:cpu:nanoseconds&var-spanSelector=undefined&var-dataSource=grafanacloud-profiles&var-filters=pod%7C%3D%7Cpyroscope-ingester-0&var-filtersBaseline=&var-filtersComparison=&var-groupBy=&maxNodes=16384&from=2025-04-08T14:04:35.126Z&to=2025-04-08T14:10:54.684Z

$ go run ./cmd/profilecli query novelty --from 2025-04-08T14:04:34Z --to 2025-04-08T14:10.54:00Z --query '{namespace="profiles-prod-001", pod="pyroscope-ingester-0"}'

8 changes: 8 additions & 0 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func main() {
queryLabelValuesCardinalityCmd := queryCmd.Command("label-values-cardinality", "Request label values cardinality.")
queryLabelValuesCardinalityParams := addQueryLabelValuesCardinalityParams(queryLabelValuesCardinalityCmd)

queryNoveltyCmd := queryCmd.Command("novelty", "Decide how much changed particular profile is compared to the ones before.")
queryNoveltyParams := addQueryNoveltyParams(queryNoveltyCmd)

queryTracerCmd := app.Command("query-tracer", "Analyze query traces.")
queryTracerParams := addQueryTracerParams(queryTracerCmd)

Expand Down Expand Up @@ -152,6 +155,11 @@ func main() {
os.Exit(checkError(err))
}

case queryNoveltyCmd.FullCommand():
if err := queryNovelty(ctx, queryNoveltyParams); err != nil {
os.Exit(checkError(err))
}

case queryTracerCmd.FullCommand():
if err := queryTracer(ctx, queryTracerParams); err != nil {
os.Exit(checkError(err))
Expand Down
148 changes: 148 additions & 0 deletions cmd/profilecli/query-novelty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package main

import (
"context"
"sort"
"strings"
"time"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/pkg/pprof/novelty"
)

type queryNoveltyParams struct {
*queryParams
TopN uint64 // TOPk stacktraces
StepSize time.Duration // Step size for the novelty query
ProfileType string
Dimension string
MergeThreshold float64
}

func addQueryNoveltyParams(queryCmd commander) *queryNoveltyParams {
params := new(queryNoveltyParams)
params.queryParams = addQueryParams(queryCmd)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("step-size", "Show the top N stacktraces").Default("15s").DurationVar(&params.StepSize)
queryCmd.Flag("top-n", "Show the top N").Default("20").Uint64Var(&params.TopN)
queryCmd.Flag("dimension", "Aggregate stacktrace-self or function-self").Default("function-self").StringVar(&params.Dimension)
queryCmd.Flag("merge-threshold", "Threshold when to consider profiles simliar enough").Default("0.10").Float64Var(&params.MergeThreshold)
return params
}

func queryNovelty(ctx context.Context, params *queryNoveltyParams) (err error) {
from, to, err := params.parseFromTo()
if err != nil {
return err
}

samples := novelty.NewSamples(0, params.MergeThreshold)

pos := from

for {
pos = pos.Add(params.StepSize)
from := pos.Add(-params.StepSize)

if pos.After(to) {
break
}

level.Info(logger).Log("msg", "query profile", "url", params.URL, "from", from, "to", to)

req := &querierv1.SelectMergeProfileRequest{
ProfileTypeID: params.ProfileType,
Start: from.UnixMilli(),
End: pos.UnixMilli(),
LabelSelector: params.Query,
}

qc := params.phlareClient.queryClient()
resp, err := qc.SelectMergeProfile(ctx, connect.NewRequest(req))
if err != nil {
return errors.Wrap(err, "failed to query")
}
var stacks []string
var values []int64
profileTypeIdx := 0

if params.Dimension == "stacktrace-self" {
// sort the samples by the first type
sort.Slice(resp.Msg.Sample, func(i, j int) bool {
return resp.Msg.Sample[i].Value[profileTypeIdx] > resp.Msg.Sample[j].Value[profileTypeIdx]
})
// append the first N samples to the noveltySamples
end := len(resp.Msg.Sample)
if end > int(params.TopN) {
end = int(params.TopN)
}
stacks = make([]string, 0, end)
values = make([]int64, 0, end)
for _, sample := range resp.Msg.Sample[:end] {
stackParts := make([]string, len(sample.LocationId))
for idx := range sample.LocationId {
loc := resp.Msg.Location[sample.LocationId[idx]-1]
if len(loc.Line) == 0 {
panic("no line")
}
functionID := loc.Line[0].FunctionId
function := resp.Msg.Function[functionID-1]
stackParts[idx] = resp.Msg.StringTable[function.Name]
}
stacks = append(stacks, strings.Join(stackParts, "|"))
values = append(values, sample.Value[profileTypeIdx])
}
} else if params.Dimension == "function-self" {
values = make([]int64, len(resp.Msg.Function))

// add the leaf function to the values
for _, sample := range resp.Msg.Sample {
if len(sample.LocationId) == 0 {
continue
}

leafLoc := resp.Msg.Location[sample.LocationId[0]-1]
functionID := leafLoc.Line[0].FunctionId
values[functionID-1] += sample.Value[profileTypeIdx]
}

// sort the functions and values by the biggest
sort.Slice(resp.Msg.Function, func(i, j int) bool {
return values[i] > values[j]
})
sort.Slice(values, func(i, j int) bool {
return values[i] > values[j]
})

// append the first N samples to the noveltySamples
end := len(resp.Msg.Function)
if end > int(params.TopN) {
end = int(params.TopN)
}

// resovle to func names
stacks = make([]string, end)
for i := range stacks {
stacks[i] = resp.Msg.StringTable[resp.Msg.Function[i].Name]
}
values = values[:end]
} else {
return errors.New("invalid dimension: " + params.Dimension)
}

for i, stack := range stacks {
level.Debug(logger).Log("stack", stack, "value", humanize.FormatInteger("", int(values[i])))
}

noveltyScore := samples.Add(stacks, values)

level.Info(logger).Log("novelty score", noveltyScore)
}

return nil
}
198 changes: 198 additions & 0 deletions pkg/pprof/novelty/novelty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package novelty

import (
"sort"
)

type Sample struct {
stackResolved int
sample int64
}

func abs(a, b int64) (match, miss int64) {
if a > b {
return b, a - b
}
return a, b - a
}

func getNovelty(profile1 profile, total1 int64, profile2 profile, total2 int64) float64 {
factor := float64(float64(total2) / float64(total1))
scale := func(sample int64) int64 {
return int64(float64(sample) * factor)
}

idx1 := 0
idx2 := 0

var matches, misses int64
for {
// if we have reached the end of both profiles, we are done
if idx1 >= len(profile1) && idx2 >= len(profile2) {
break
}

// end of one of the profiles, we can break after adding the remaining samples
if idx1 >= len(profile1) {
misses += profile2[idx2].sample
idx2++
continue
}
if idx2 >= len(profile2) {
misses += scale(profile1[idx1].sample)
idx1++
continue
}

// check for match
if profile1[idx1].stackResolved == profile2[idx2].stackResolved {
match, miss := abs(scale(profile1[idx1].sample), profile2[idx2].sample)
matches += match
misses += miss
idx1++
idx2++
continue
}

// if profile1 is less than profile2, we need to move to the next profile1
if profile1[idx1].stackResolved < profile2[idx2].stackResolved {
misses += scale(profile1[idx1].sample)
idx1++
continue
}

// if profile2 is less than profile1, we need to move to the next profile2
if profile1[idx1].stackResolved > profile2[idx2].stackResolved {
misses += profile2[idx2].sample
idx2++
continue
}
}

return float64(matches) / float64(matches+misses)

}

func mergeProfiles(profile1, profile2 profile) (merged profile, total int64) {
merged = make(profile, 0, len(profile1))
total = 0

idx1 := 0
idx2 := 0

for {
// if we have reached the end of both profiles, we are done
if idx1 >= len(profile1) && idx2 >= len(profile2) {
break
}

if idx1 >= len(profile1) {
left := profile2[idx2:]
merged = append(merged, left...)
for _, sample := range left {
total += sample.sample
}
break
}
if idx2 >= len(profile2) {
left := profile1[idx1:]
merged = append(merged, left...)
for _, sample := range left {
total += sample.sample
}
break
}

// check for match
if profile1[idx1].stackResolved == profile2[idx2].stackResolved {
value := profile1[idx1].sample + profile2[idx2].sample
merged = append(merged, Sample{
stackResolved: profile1[idx1].stackResolved,
sample: value,
})
total += value
idx1++
idx2++
continue
}

// if profile1 is less than profile2, we attach that one
if profile1[idx1].stackResolved < profile2[idx2].stackResolved {
merged = append(merged, profile1[idx1])
total += profile1[idx1].sample
idx1++
continue
}

// if profile2 is less than profile1, we attach that one
if profile1[idx1].stackResolved > profile2[idx2].stackResolved {
merged = append(merged, profile2[idx2])
total += profile2[idx2].sample
idx2++
continue
}
}

return merged, total
}

type profile []Sample

type Samples struct {
threshold float64 // when is something considered a match

stackMap map[string]int
stackTable []string

profiles []profile
totals []int64
}

func NewSamples(size int, threshold float64) *Samples {
return &Samples{
stackMap: make(map[string]int, size),
stackTable: make([]string, 0, size),
threshold: threshold,
}
}

func (n *Samples) Add(stack []string, value []int64) float64 {
profile := make(profile, len(stack))
total := int64(0)
for idx := range stack {
pos, ok := n.stackMap[stack[idx]]
if !ok {
pos = len(n.stackTable)
n.stackMap[stack[idx]] = pos
n.stackTable = append(n.stackTable, stack[idx])
}
profile[idx].stackResolved = pos
profile[idx].sample = value[idx]
total += value[idx]
}

// sort the profile by stackResolved
sort.Slice(profile, func(i, j int) bool {
return profile[i].stackResolved < profile[j].stackResolved
})

maxNovelty := 0.0
maxNoveltyIdx := -1
for idx, p := range n.profiles {
novelty := getNovelty(p, n.totals[idx], profile, total)
if novelty > maxNovelty {
maxNovelty = novelty
maxNoveltyIdx = idx
}
}

if maxNovelty >= 0 && maxNovelty > n.threshold {
n.profiles[maxNoveltyIdx], n.totals[maxNoveltyIdx] = mergeProfiles(n.profiles[maxNoveltyIdx], profile)
return maxNovelty
}

// add a new sub profile
n.profiles = append(n.profiles, profile)
n.totals = append(n.totals, total)
return maxNovelty
}
Loading
Loading