Skip to main content

Combine streams

Combining streams is the essence of SQL. It is very important to have efficient stream combiners.

Select

Although Select is an operator dedicated to transform the content of the payload, it can also combine the content of payloads of a stream with the content of a single event stream ISingleStream.

private static void DefineProcess(ISingleStream<string> contextStream)
{
var res = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100))
.Select("apply single stream content", contextStream, (l, r) => $"{l}-{r}");
// ...
}

Join and LeftJoin

There is no Join operator; just a LeftJoin exists. Like a regular SQL left join, the right value will be null if nothing matches. Like in SSIS, the left input stream must be properly sorted depending on the left pivot key, and the stream that is looked up must be keyed on depending on the right pivot key. Both keys must have exactly the same signature (name and types).

private static void DefineProcess(ISingleStream<string> contextStream)
{
var sortedStream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new { Id = i, OutputId = i % 10, label = $"Label{i}" }))
.EnsureSorted("ensure it is sorted on OutputId", i => new { i.OutputId });

var streamToLookup = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new { Id = i, label = $"OtherLabel{i}" }))
.EnsureKeyed("ensure it is keyed on Id", i => new { OutputId = i.Id });

var res = sortedStream1.LeftJoin("join output values", streamToLookup, (l, r) => new { FromLeft = l, FromRight = r });
// ...
}
note

LeftJoin is the operator to chose in favor of Lookup is streams are sorted because nothing remains in memory during the process. It goes without saying that internally, if streams were not sorted, the operator would have to make it itself. But it may happen that these input streams are already sorted, and then, by sorting it again, the operator would lose precious time, and moreover would have to store both streams content in memory before issuing the output. To prevent this, the way to deal with sorts must be done by the developer. Either by actually sorting, either by simply ensuring the stream is sorted. If input streams are not sorted, it will be a more convenient choice to deal with the Lookup operator.

Lookup

Lookup has the same purpose than LeftJoin and behaves in the same way, but does all the underlying mechanism is done under the hood for non sorted streams.

private static void DefineProcess(ISingleStream<string> contextStream)
{
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new { Id = i, OutputId = i % 10, label = $"Label{i}" }));

var streamToLookup = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new { Id = i, label = $"OtherLabel{i}" }));

var res = stream1.Lookup("join output values", streamToLookup, l => l.OutputId, r => r.Id, (l, r) => new { FromLeft = l, FromRight = r });
// ...
}

Union/Merge

Merging two streams together can be done with the Union operator.

private static void DefineProcess(ISingleStream<string> contextStream)
{
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new { Id = i, label = $"Label{i}" }));
var stream2 = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new { Id = i, label = $"OtherLabel{i}" }));

var res = stream1.Union("merge with stream 2", stream2);
// ...
}

UnionAll

private static void DefineProcess(ISingleStream<string> contextStream)
{
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new { Id = i, label = $"Label{i}" }));
var stream2 = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new { Id = i, label = $"OtherLabel{i}" }));

var res = stream1.UnionAll("merge with stream 2", stream2);
// ...
}
info

UnionAll is a concatenation; it is the least performant way to merge two streams. Unlike in SQL, Union is faster than UnionAll because UnionAll of ETL.NET concatenates both streams. So while the first steam to complete, it needs to store every event of the second stream before emitting them once the first stream is completed.

Substract

Substract remove elements from a stream depending on what is in another stream. The criteria is based on the comparison of 2 keys.

private static void DefineProcess(ISingleStream<string> contextStream)
{
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new Tmp1 { Id = i, Label = $"Label{i}" }));
var stream2 = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new Tmp1 { Id = i, Label = $"OtherLabel{i}" }));

// don't issue payload from stream1 if the stream2 emits a payload that contains the same `Id`
var res = stream1.Substract("merge with stream 2", stream2, i => i.Id, i => i.Id);
// ...
}

If both streams are sorted the way is more straight forward:

private static void DefineProcess(ISingleStream<string> contextStream)
{
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable
.Range(1, 100)
.Select(i => new Tmp1 { Id = i, Label = $"Label{i}" }))
.EnsureSorted("ensure it is sorted on Id", i => new { i.Id });
var stream2 = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable
.Range(1, 8)
.Select(i => new Tmp1 { Id = i, Label = $"OtherLabel{i}" }))
.EnsureKeyed("ensure it is keyed on Id2", i => new { i.Id });

// don't issue payload from stream1 if the stream2 emits a payload that contains the same `Id`
var res = stream1.Substract("merge with stream 2", stream2);
// ...
}

Having sorted streams that are already sorted make the operator much faster. But sorting it before the operator doesn't make sense as this sort can be very heavy. It makes sense when retrieving sorted data from the database for example.

Correlate to single/many

The whole correlation mechanism is described in the normalization recipe.