Skip to main content

Reduce/aggregate and distinct

Distinct

Distinct Permits to remove duplicates based on a given key.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new
{
OutputId = i % 11,
Label = $"{ctx}{i % 11}",
Description = (i % 5 == 0) ? null : $"Description {i}"
}))
.Distinct("Distinct ", i => i.OutputId)
.Do("print file name to console", i => Console.WriteLine(i));

Input stream:

OutputIdLabelDescription
1Label 1Description 1
2Description 2
3Description 3
4Label 4Description 4
5Label 5
6Label 6Description 6
0Label 0Description 0
1Label 1
2Label 2Description 2
3Label 3Description 3
4Description 4
5Description 5
6Label 6Description 6

The regular Distinct provides this result:

OutputIdLabelDescription
1Label 1Description 1
2Description 2
3Description 3
4Label 4Description 4
5Label 5
6Label 6Description 6
0Label 0Description 0

Sometimes, from bad input files can fill a value for a column but not always. It is a usual request in such a context to take in consideration only not null value instead of the first found record as is. Here the output stream we would like to reach:

OutputIdLabelDescription
1Label 1Description 1
2Label 2Description 2
3Label 3Description 3
4Label 4Description 4
5Label 5Description 5
6Label 6Description 6
0Label 0Description 0

Obtaining this result is made by setting true the second optional parameter of Distinct operator.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new
{
OutputId = i % 11,
Label = $"{ctx}{i % 11}",
Description = (i % 5 == 0) ? null : $"Description {i}"
}))
.Distinct("Distinct ", i => i.OutputId, true)
.Do("print file name to console", i => Console.WriteLine(i));

Group By

In the create several files recipe GroupBy was used to group payloads in a sub process per key. But the GroupBy operator can be used to group payload in a list per key.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new
{
OutputId = i % 11,
Label = $"{ctx}{i}",
Description = (i % 5 == 0) ? null : $"Description {i}"
}))
.GroupBy("group by OutputId", i => i.OutputId)
.Do("print file name to console", i => Console.WriteLine($"{i.Key}: {i.Aggregation.Count} items"));

Aggregate

GroupBy simply groups payloads in lists or in sub processes. Aggregate permits to make any free action per group of payload. It can make more things than GroupBy but it is lest straight forward to use.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new
{
Id = i,
OutputId = i % 11,
Label = $"{ctx}{i}",
Description = (i % 5 == 0) ? null : $"Description {i}"
}))
.Aggregate("aggregate by OutputId",
i => i.OutputId,
i => new { Key = i.OutputId, Ids = new List<int>() },
(a, v) =>
{
a.Ids.Add(v.Id);
return a;
})
.Do("print file name to console", i => Console.WriteLine($"{i.Key}: {i.Aggregation.Ids.Count} items"));

Pivot

Pivot makes several aggregation of values on a single output occurrence. Like the PIVOT of SQL, or the pivot table of excel.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new
{
Id = i,
OutputId = i % 3,
Label = $"{ctx}{i}",
Description = (i % 5 == 0) ? null : $"Description {i}"
}))
.Pivot("pivot values", i => i.OutputId, i => new
{
Count = AggregationOperators.Count(),
Count0 = AggregationOperators.Count().For(i.OutputId == 0),
Count1 = AggregationOperators.Count().For(i.OutputId == 1),
Count2 = AggregationOperators.Count().For(i.OutputId == 2)
})
.Do("print file name to console", i => Console.WriteLine($"{i.Key}: Count={i.Aggregation.Count}, Count0={i.Aggregation.Count0}, Count1={i.Aggregation.Count1}, Count2={i.Aggregation.Count2}"));

ToList

ToList aggregates all the payload of every events in one payload that is the list of every payload.

contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100))
.ToList("aggregate everything")
.Do("print everything in one go", i => Console.WriteLine(string.Join("-", i)));