Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit dca3b4a

Browse files
committed
Implement Aggregation of Increment and Time measurements
Fixes #9
1 parent bec2ce3 commit dca3b4a

File tree

9 files changed

+573
-91
lines changed

9 files changed

+573
-91
lines changed

src/InfluxDB.Collector/CollectorConfiguration.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class CollectorConfiguration
1010
readonly PipelinedCollectorTagConfiguration _tag;
1111
readonly PipelinedCollectorEmitConfiguration _emitter;
1212
readonly PipelinedCollectorBatchConfiguration _batcher;
13+
readonly PipelinedCollectorAggregateConfiguration _aggregator;
1314

1415
public CollectorConfiguration()
1516
: this(null)
@@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null)
2223
_tag = new PipelinedCollectorTagConfiguration(this);
2324
_emitter = new PipelinedCollectorEmitConfiguration(this);
2425
_batcher = new PipelinedCollectorBatchConfiguration(this);
26+
_aggregator = new PipelinedCollectorAggregateConfiguration(this);
2527
}
2628

2729
public CollectorTagConfiguration Tag => _tag;
@@ -30,6 +32,8 @@ internal CollectorConfiguration(IPointEmitter parent = null)
3032

3133
public CollectorBatchConfiguration Batch => _batcher;
3234

35+
public CollectorAggregateConfiguration Aggregate => _aggregator;
36+
3337
public MetricsCollector CreateCollector()
3438
{
3539
Action disposeEmitter;
@@ -38,6 +42,7 @@ public MetricsCollector CreateCollector()
3842
var emitter = _parent;
3943
emitter = _emitter.CreateEmitter(emitter, out disposeEmitter);
4044
emitter = _batcher.CreateEmitter(emitter, out disposeBatcher);
45+
emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter);
4146

4247
return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () =>
4348
{
@@ -46,4 +51,4 @@ public MetricsCollector CreateCollector()
4651
});
4752
}
4853
}
49-
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Configuration
5+
{
6+
public abstract class CollectorAggregateConfiguration
7+
{
8+
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
9+
10+
public abstract CollectorConfiguration SumIncrements();
11+
12+
public abstract CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func);
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using InfluxDB.Collector.Pipeline;
4+
using InfluxDB.Collector.Pipeline.Aggregate;
5+
6+
namespace InfluxDB.Collector.Configuration
7+
{
8+
class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration
9+
{
10+
private readonly CollectorConfiguration _configuration;
11+
12+
bool _sumIncrements;
13+
Func<IEnumerable<long>, double> _timeAggregation;
14+
TimeSpan? _interval;
15+
16+
public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration)
17+
{
18+
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
19+
_configuration = configuration;
20+
}
21+
22+
public override CollectorConfiguration AtInterval(TimeSpan interval)
23+
{
24+
_interval = interval;
25+
return _configuration;
26+
}
27+
28+
public override CollectorConfiguration SumIncrements()
29+
{
30+
_sumIncrements = true;
31+
return _configuration;
32+
}
33+
34+
public override CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func)
35+
{
36+
_timeAggregation = func;
37+
return _configuration;
38+
}
39+
40+
public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
41+
{
42+
if (_interval == null)
43+
{
44+
dispose = null;
45+
return parent;
46+
}
47+
48+
var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent);
49+
dispose = aggregator.Dispose;
50+
return aggregator;
51+
}
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Pipeline.Aggregate
5+
{
6+
struct GroupingKey : IEquatable<GroupingKey>
7+
{
8+
private static readonly Dictionary<string, string> EmptyDict = new Dictionary<string, string>();
9+
10+
public long Bucket { get; }
11+
12+
public MeasurementKind Kind { get; }
13+
14+
public string Measurement { get; }
15+
16+
public Dictionary<string, string> Tags { get; }
17+
18+
public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary<string, string> tags)
19+
{
20+
Bucket = bucket;
21+
Kind = kind;
22+
Measurement = measurement;
23+
Tags = tags ?? EmptyDict;
24+
}
25+
26+
public bool Equals(GroupingKey other)
27+
{
28+
return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags);
29+
}
30+
31+
public override bool Equals(object obj)
32+
{
33+
if (ReferenceEquals(null, obj))
34+
{
35+
return false;
36+
}
37+
38+
return obj is GroupingKey key && Equals(key);
39+
}
40+
41+
public override int GetHashCode()
42+
{
43+
unchecked
44+
{
45+
int hashCode = Bucket.GetHashCode();
46+
hashCode = (hashCode * 397) ^ (int) Kind;
47+
hashCode = (hashCode * 397) ^ Measurement.GetHashCode();
48+
hashCode = (hashCode * 397) ^ TagsHashCode();
49+
return hashCode;
50+
}
51+
}
52+
53+
int TagsHashCode()
54+
{
55+
unchecked
56+
{
57+
int hashCode = 1;
58+
foreach (var kvp in Tags)
59+
{
60+
hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode();
61+
}
62+
63+
return hashCode;
64+
}
65+
}
66+
67+
static bool DictionaryEquals(Dictionary<string, string> dict, Dictionary<string, string> dict2)
68+
{
69+
if (dict.Count != dict2.Count)
70+
{
71+
return false;
72+
}
73+
74+
foreach (var kvp in dict)
75+
{
76+
if (dict2.TryGetValue(kvp.Key, out string value))
77+
{
78+
if (value != kvp.Value)
79+
{
80+
return false;
81+
}
82+
}
83+
else
84+
{
85+
return false;
86+
}
87+
}
88+
89+
return true;
90+
}
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// ==========================================================================
2+
// AggregatePointEmitter.cs
3+
// Bus Portal (busliniensuche.de)
4+
// ==========================================================================
5+
// All rights reserved.
6+
// ==========================================================================
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Linq;
11+
using InfluxDB.Collector.Pipeline.Common;
12+
13+
namespace InfluxDB.Collector.Pipeline.Aggregate
14+
{
15+
class AggregatePointEmitter : IntervalEmitterBase
16+
{
17+
readonly bool _sumIncrements;
18+
readonly Func<IEnumerable<long>, double> _timesAggregation;
19+
readonly IPointEmitter _parent;
20+
21+
public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumerable<long>, double> timesAggregation, IPointEmitter parent)
22+
: base(timeSpan)
23+
{
24+
_sumIncrements = sumIncrements;
25+
_timesAggregation = timesAggregation;
26+
_parent = parent;
27+
}
28+
29+
protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
30+
{
31+
var grouped = batch.GroupBy(x => new GroupingKey(
32+
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0,
33+
DetermineKind(x),
34+
x.Measurement,
35+
x.Tags
36+
));
37+
38+
var aggregated = grouped.SelectMany(Aggregate).ToArray();
39+
40+
_parent.Emit(aggregated);
41+
}
42+
43+
IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
44+
{
45+
GroupingKey key = group.Key;
46+
MeasurementKind kind = key.Kind;
47+
48+
if (kind == MeasurementKind.Increment && _sumIncrements)
49+
{
50+
long sum = group.Sum(x => (long) x.Fields["count"]);
51+
return new[]
52+
{
53+
new PointData(
54+
key.Measurement,
55+
new Dictionary<string, object> { { "count", sum } },
56+
key.Tags,
57+
AverageTime(key))
58+
};
59+
}
60+
61+
if (kind == MeasurementKind.Time && _timesAggregation != null)
62+
{
63+
long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks));
64+
return new[]
65+
{
66+
new PointData(
67+
key.Measurement,
68+
new Dictionary<string, object> { { "value", new TimeSpan(ticks) } },
69+
key.Tags,
70+
AverageTime(key))
71+
};
72+
}
73+
74+
return group;
75+
}
76+
77+
private DateTime AverageTime(GroupingKey key)
78+
{
79+
return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc);
80+
}
81+
82+
static MeasurementKind DetermineKind(PointData x)
83+
{
84+
if (x.Fields.Count != 1) return MeasurementKind.Other;
85+
86+
if (x.Fields.TryGetValue("count", out var count) && count is long)
87+
{
88+
return MeasurementKind.Increment;
89+
}
90+
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
91+
{
92+
return MeasurementKind.Time;
93+
}
94+
else
95+
{
96+
return MeasurementKind.Other;
97+
}
98+
}
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace InfluxDB.Collector.Pipeline.Aggregate
2+
{
3+
public enum MeasurementKind
4+
{
5+
Other = 0, Increment, Time
6+
}
7+
}

0 commit comments

Comments
 (0)