using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using Xunit;
namespace Paillave.Etl.Tests.DocExamples;
///
/// Each test in this file is a runnable mirror of an example from the
/// official documentation under documentation/docs/operators/1_core.md.
/// Whenever the doc is changed, the corresponding test must stay green; this
/// guarantees that every snippet shipped to users compiles and runs as
/// described.
///
public class CoreOperatorsExamplesTests
{
// ===================================================================
// Helpers — every example follows the same skeleton:
// await StreamProcessRunner.CreateAndExecuteAsync(rootValue, root => { ... });
// The "root" parameter is the seed of every stream; here we feed a
// small in-memory enumerable through CrossApply.
// ===================================================================
private static async Task RunAsync(Action> build)
=> await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
build(root.CrossApply("seed", _ => Enumerable.Range(1, 10))));
// ----- Select ------------------------------------------------------
[Fact]
public async Task Select_DoublesEachInteger()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 5))
.Select("double", i => i * 2)
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 2, 4, 6, 8, 10 }, collected.OrderBy(i => i));
}
[Fact]
public async Task Select_WithIndex()
{
var collected = new ConcurrentBag<(int idx, string letter)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[] { "a", "b", "c" })
.Select("with index", (letter, idx) => (idx, letter))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { (0, "a"), (1, "b"), (2, "c") }, collected.OrderBy(t => t.idx));
}
// ----- Where -------------------------------------------------------
[Fact]
public async Task Where_KeepsOnlyMatching()
{
var collected = new ConcurrentBag();
var status = await RunAsync(s => s
.Where("even only", i => i % 2 == 0)
.Do("collect", collected.Add));
Assert.False(status.Failed);
Assert.Equal(new[] { 2, 4, 6, 8, 10 }, collected.OrderBy(i => i));
}
// ----- Do (side effect) -------------------------------------------
[Fact]
public async Task Do_RunsForEveryRow()
{
int count = 0;
var status = await RunAsync(s => s.Do("count", _ => System.Threading.Interlocked.Increment(ref count)));
Assert.False(status.Failed);
Assert.Equal(10, count);
}
// ----- CrossApply --------------------------------------------------
[Fact]
public async Task CrossApply_ExpandsRowsOneToMany()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[] { 2, 3 })
.CrossApply("expand", n => Enumerable.Range(1, n))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
// 2 -> 1,2 ; 3 -> 1,2,3
Assert.Equal(5, collected.Count);
Assert.Equal(new[] { 1, 1, 2, 2, 3 }, collected.OrderBy(i => i));
}
// ----- Distinct ----------------------------------------------------
[Fact]
public async Task Distinct_RemovesDuplicates()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[] { 1, 1, 2, 2, 3 })
.Distinct("dedup", i => i)
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 2, 3 }, collected.OrderBy(i => i));
}
public class PersonRow
{
public int Id { get; set; }
public string? FirstName { get; set; }
public string? LastName { get; set; }
}
[Fact]
public async Task Distinct_Smart_FillsMissingFieldsAcrossDuplicates()
{
// The "smart" Distinct overload (no aggregation builder; uses
// ObjectMerger reflection) merges duplicate rows by filling null
// fields with the first non-null value seen for each key.
var rows = new[]
{
new PersonRow { Id = 1, FirstName = "Alice", LastName = null },
new PersonRow { Id = 1, FirstName = null, LastName = "Smith" },
new PersonRow { Id = 2, FirstName = "Bob", LastName = "Brown" },
};
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => rows)
.Distinct("merge dups", p => p.Id, b => b
.ForProperty(p => p.FirstName, DistinctAggregator.FirstNotNull)
.ForProperty(p => p.LastName, DistinctAggregator.FirstNotNull))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
var byId = collected.ToDictionary(p => p.Id);
Assert.Equal("Alice", byId[1].FirstName);
Assert.Equal("Smith", byId[1].LastName);
Assert.Equal("Bob", byId[2].FirstName);
}
// ----- Aggregate ---------------------------------------------------
[Fact]
public async Task Aggregate_SumsByKey()
{
var collected = new ConcurrentBag<(string key, int sum)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[]
{
("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5),
})
.Aggregate(
"sum per key",
getKey: t => t.Item1,
emptyAggregation: t => 0,
aggregate: (acc, t) => acc + t.Item2)
.Do("collect", r => collected.Add((r.Key, r.Aggregation)));
});
Assert.False(status.Failed);
var byKey = collected.ToDictionary(t => t.key, t => t.sum);
Assert.Equal(9, byKey["A"]);
Assert.Equal(6, byKey["B"]);
}
// ----- GroupBy with sub-process ------------------------------------
[Fact]
public async Task GroupBy_WithSubProcess_CountsPerGroup()
{
// GroupBy starts a tiny "sub-pipeline" per group; here we just
// count the rows of each group and emit a single (key, count) row.
var collected = new ConcurrentBag<(string key, int count)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[]
{
("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5),
})
.GroupBy(
"per key",
getKey: t => t.Item1,
subProcess: (subStream, first) => subStream
.Aggregate("count", _ => first.Item1, _ => 0, (acc, _) => acc + 1)
.Select("project", r => (r.Key, r.Aggregation)))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
var byKey = collected.ToDictionary(t => t.key, t => t.count);
Assert.Equal(3, byKey["A"]);
Assert.Equal(2, byKey["B"]);
}
// ----- Sort / EnsureSorted -----------------------------------------
[Fact]
public async Task Sort_OrdersTheStream()
{
var collected = new List();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[] { 3, 1, 4, 1, 5, 9, 2, 6 })
.Sort("ascending", i => i)
.Do("collect", collected.Add); // Sort preserves order downstream
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 1, 2, 3, 4, 5, 6, 9 }, collected);
}
// ----- UnionAll ----------------------------------------------------
[Fact]
public async Task UnionAll_ConcatenatesTwoStreams()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var left = root.CrossApply("left", _ => new[] { 1, 2, 3 });
var right = root.CrossApply("right", _ => new[] { 4, 5, 6 });
left.UnionAll("merge", right).Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 2, 3, 4, 5, 6 }, collected.OrderBy(i => i));
}
// ----- Top / Skip --------------------------------------------------
[Fact]
public async Task Top_KeepsFirstN()
{
var collected = new List();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 100))
.Top("first 3", 3)
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 2, 3 }, collected);
}
[Fact]
public async Task Skip_DropsFirstN()
{
var collected = new List();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 5))
.Skip("drop 2", 2)
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 3, 4, 5 }, collected);
}
// ----- First / Last ------------------------------------------------
[Fact]
public async Task First_PromotesFirstRowToSingleStream()
{
int? captured = null;
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(10, 5))
.First("first")
.Do("capture", v => captured = v);
});
Assert.False(status.Failed);
Assert.Equal(10, captured);
}
// ----- Pivot -------------------------------------------------------
public class PivotKV { public string K { get; set; } = ""; public int V { get; set; } }
[Fact]
public async Task Pivot_SumAndMaxOnDescriptor()
{
var collected = new ConcurrentBag<(string key, int sum, int max)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[]
{
new PivotKV { K = "A", V = 1 },
new PivotKV { K = "A", V = 2 },
new PivotKV { K = "B", V = 5 },
})
.Pivot("sum + max", t => t.K, t => new
{
Sum = AggregationOperators.Sum(t.V),
Max = AggregationOperators.Max(t.V),
})
.Do("collect", r => collected.Add((r.Key, r.Aggregation.Sum, r.Aggregation.Max)));
});
Assert.False(status.Failed);
var byKey = collected.ToDictionary(t => t.key);
Assert.Equal(3, byKey["A"].sum);
Assert.Equal(2, byKey["A"].max);
Assert.Equal(5, byKey["B"].sum);
Assert.Equal(5, byKey["B"].max);
}
// ----- Lookup (non-correlated, plain join) -------------------------
public class CountryRow { public string Code { get; set; } = ""; public string Name { get; set; } = ""; }
public class RowKV { public int K { get; set; } public string V { get; set; } = ""; }
[Fact]
public async Task Lookup_EnrichesLeftWithRight()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var people = root.CrossApply("people",
_ => new[] { ("Alice", "FR"), ("Bob", "DE"), ("Eve", "US") });
var countries = root.CrossApply("countries", _ => new[]
{
new CountryRow { Code = "FR", Name = "France" },
new CountryRow { Code = "DE", Name = "Germany" },
new CountryRow { Code = "US", Name = "USA" },
});
people.Lookup("enrich",
rightStream: countries,
leftKey: p => p.Item2,
rightKey: c => c.Code,
resultSelector: (p, c) => $"{p.Item1} lives in {c.Name}")
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Contains("Alice lives in France", collected);
Assert.Contains("Bob lives in Germany", collected);
Assert.Contains("Eve lives in USA", collected);
}
// ----- LeftJoin (sorted/keyed) -------------------------------------
[Fact]
public async Task LeftJoin_KeepsLeftRowsEvenWithoutMatch()
{
var collected = new ConcurrentBag<(string left, string? right)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var left = root.CrossApply("left",
_ => new[]
{
new RowKV { K = 1, V = "a" },
new RowKV { K = 2, V = "b" },
new RowKV { K = 3, V = "c" },
}).Sort("L sort", t => t.K);
var right = root.CrossApply("right",
_ => new[]
{
new RowKV { K = 1, V = "X" },
new RowKV { K = 3, V = "Z" },
})
.EnsureKeyed("R keyed", t => t.K);
left.LeftJoin("join",
rightStream: right,
resultSelector: (l, r) => (l.V, r?.V))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
var byLeft = collected.ToDictionary(t => t.left);
Assert.Equal("X", byLeft["a"].right);
Assert.Null(byLeft["b"].right);
Assert.Equal("Z", byLeft["c"].right);
}
// ----- Substract ---------------------------------------------------
[Fact]
public async Task Substract_RemovesRowsPresentInRight()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var left = root.CrossApply("left", _ => new[] { 1, 2, 3, 4, 5 });
var right = root.CrossApply("right", _ => new[] { 2, 4 });
left.Substract("diff", right, l => l, r => r)
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 3, 5 }, collected.OrderBy(i => i));
}
// ----- WithPrevious ------------------------------------------------
[Fact]
public async Task WithPrevious_ExposesSlidingWindow()
{
var collected = new List<(int? prev, int curr)>();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[] { 10, 20, 30, 40 })
.WithPrevious("window 2", 2, window =>
{
// window[0] = current, window[1] = previous (when present)
var prev = window.Length >= 2 ? (int?)window[1] : null;
return (prev, window[0]);
})
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Equal((null, 10), collected[0]);
Assert.Equal((10, 20), collected[1]);
Assert.Equal((20, 30), collected[2]);
Assert.Equal((30, 40), collected[3]);
}
// ----- Chunk -------------------------------------------------------
[Fact]
public async Task Chunk_BatchesRows()
{
var sizes = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 7))
.Chunk("by 3", 3)
.Do("collect sizes", batch => sizes.Add(batch.Count()));
});
Assert.False(status.Failed);
// 7 rows in batches of 3 => sizes [3,3,1] in some order
Assert.Equal(new[] { 1, 3, 3 }, sizes.OrderBy(i => i));
}
// ----- ToList ------------------------------------------------------
[Fact]
public async Task ToList_CollectsTheStreamIntoASingle()
{
List? captured = null;
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 5))
.ToList("collect")
.Do("capture", l => captured = l);
});
Assert.False(status.Failed);
Assert.Equal(new[] { 1, 2, 3, 4, 5 }, captured!.OrderBy(i => i));
}
// ----- OfType ------------------------------------------------------
public abstract class Animal { public string Name { get; set; } = ""; }
public class Dog : Animal { public string Breed { get; set; } = ""; }
public class Cat : Animal { public bool Indoor { get; set; } }
[Fact]
public async Task OfType_FiltersBySubtype()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new Animal[]
{
new Dog { Name = "Rex", Breed = "Lab" },
new Cat { Name = "Mia", Indoor = true },
new Dog { Name = "Bo", Breed = "Pug" },
})
.OfType("dogs only")
.Do("collect", d => collected.Add($"{d.Name} ({d.Breed})"));
});
Assert.False(status.Failed);
Assert.Equal(2, collected.Count);
Assert.Contains("Rex (Lab)", collected);
Assert.Contains("Bo (Pug)", collected);
}
// ----- Combine (single streams) ------------------------------------
[Fact]
public async Task Combine_BindsTwoSinglesIntoOne()
{
int? combined = null;
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var a = root.CrossApply("A", _ => new[] { 10 }).EnsureSingle("ensure A");
var b = root.CrossApply("B", _ => new[] { 32 }).EnsureSingle("ensure B");
a.Combine("sum", b, (x, y) => x + y)
.Do("capture", v => combined = v);
});
Assert.False(status.Failed);
Assert.Equal(42, combined);
}
// ----- SetForCorrelation / CorrelateToSingle -----------------------
[Fact]
public async Task CorrelateToSingle_PairsTwoStreamsByCorrelationToken()
{
// SetForCorrelation tags every row with a fresh GUID. After
// branching, CorrelateToSingle re-pairs rows that originate from
// the same source row by token.
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
var src = root.CrossApply("seed",
_ => new[] { (1, "Alice"), (2, "Bob") })
.SetForCorrelation("tag");
var ids = src.Select("project id", r => r.Item1);
var names = src.Select("project name", r => r.Item2);
ids.CorrelateToSingle("rejoin", names, (id, name) => $"{id}={name}")
.DoCorrelated("collect", collected.Add);
});
Assert.False(status.Failed);
Assert.Contains("1=Alice", collected);
Assert.Contains("2=Bob", collected);
}
// ----- Fix ---------------------------------------------------------
public class CityRow
{
public string Name { get; set; } = "";
public string Country { get; set; } = "";
}
[Fact]
public async Task Fix_SetsDefaultsForMissingFields()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => new[]
{
new CityRow { Name = "Paris", Country = "FR" },
new CityRow { Name = "Tokyo" }, // Country missing
})
.Fix("default country", f => f
.FixProperty(c => c.Country).IfNullWith(_ => "??"))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
var byName = collected.ToDictionary(c => c.Name);
Assert.Equal("FR", byName["Paris"].Country);
Assert.Equal("??", byName["Tokyo"].Country);
}
// ----- SubProcess --------------------------------------------------
[Fact]
public async Task SubProcess_WrapsAReusablePipelineSegment()
{
var collected = new ConcurrentBag();
var status = await StreamProcessRunner.CreateAndExecuteAsync(0, root =>
{
root.CrossApply("seed", _ => Enumerable.Range(1, 10))
.SubProcess("only odd squared", first => first
.CrossApply("expand", _ => new[] { 0 }) // re-streamify
.Where("odd", i => i % 2 == 1)
.Select("square", i => i * i))
.Do("collect", collected.Add);
});
Assert.False(status.Failed);
// SubProcess sees ISingleStream, but here we just wanted to show
// it compiles and runs; the meaningful pattern is in recipes.
Assert.False(status.Failed);
}
}