Normalize a flat structure
In traditional ETL, normalizing flat inputs (meaning dispatching one row into several tables, with one table that makes reference to the other one.s) is typically a duty that is far to be straight forward. This is extremely surprising to say the least, as this is one of the purposes of the "T" (Transform) in the acronym "ETL".
ETL.NET has a very clear and straight forward out of the box tools to handle this very usual pattern when it is about to import flat structures such as files. This mechanism is so straight forward that it seems almost magical!
Principle
This normalization is possible thanks to two features:
- A way to know the original row/rows that are at the source of the current row
- The capacity to make a Select/Update/Insert for each row to save with
Paillave.EtlNet.EntityFrameworkCore
orPaillave.EtlNet.SqlServer
The normalization pattern lie on the possibility to know what was the initial row or rows that permitted to result to a payload.
Behind the scenes, each row has a list of unique identifiers linked to it, and each operator is supposed to know how to combine these uids depending on its purpose. When a stream is issuing payloads that have a list of unique identifiers under the hood, it is called a correlated stream.
Out of the box, streams are not correlated for performance and memory purpose. For a stream to be correlated, it must pass through a dedicated operator using SetForCorrelation
. This operator simply attributes a list that contains a different single unique identifier to each row. Every operator of the core of ETL.NET knows how to handle correlated stream and issues a proper correlated stream regarding to its logic.
For example, let's imagine this sequence of events:
Value1 | Value2 |
---|---|
1 | z |
2 | z |
3 | d |
4 | d |
5 | e |
6 | d |
7 | d |
8 | e |
Correlating this stream using SetForCorrelation
would change it this way:
Value1 | Value2 | Correlation ids |
---|---|---|
1 | z | dafb56a0-7bfd-482f-8ed2-e47ded1abfe3 |
2 | z | 851267eb-50e0-47f9-b988-e17e75a458d2 |
3 | d | 8af7348a-c004-4c4d-90db-f6fa5213cabb |
4 | d | 4b1ac39f-89c0-426b-a1e2-07c63aebf938 |
5 | e | c6781c10-0f6b-4668-97e6-2788627aa7a4 |
6 | d | 338aabe0-3266-4c2d-9b0e-770b0c0b14dd |
7 | d | c131bb0b-97fa-4c63-86a5-ebcbeb0800f6 |
8 | e | a1f13dfc-f9e4-46af-9e7a-1e64271f1691 |
Making a Distinct
based on Value2
would give this stream as a result:
Value1 | Value2 | Correlation ids |
---|---|---|
1 | z | dafb56a0-7bfd-482f-8ed2-e47ded1abfe3 , 851267eb-50e0-47f9-b988-e17e75a458d2 |
3 | d | 8af7348a-c004-4c4d-90db-f6fa5213cabb , 4b1ac39f-89c0-426b-a1e2-07c63aebf938 , 338aabe0-3266-4c2d-9b0e-770b0c0b14dd , c131bb0b-97fa-4c63-86a5-ebcbeb0800f6 |
5 | e | c6781c10-0f6b-4668-97e6-2788627aa7a4 , a1f13dfc-f9e4-46af-9e7a-1e64271f1691 |
Then, let's save payloads of this stream based on the business key (Value2
). As the save system gets the Id wether an occurrence already exists in the database or not, we get this result:
Value2 | Id | Correlation ids |
---|---|---|
z | 45 | dafb56a0-7bfd-482f-8ed2-e47ded1abfe3 , 851267eb-50e0-47f9-b988-e17e75a458d2 |
d | 69 | 8af7348a-c004-4c4d-90db-f6fa5213cabb , 4b1ac39f-89c0-426b-a1e2-07c63aebf938 , 338aabe0-3266-4c2d-9b0e-770b0c0b14dd , c131bb0b-97fa-4c63-86a5-ebcbeb0800f6 |
e | 13 | c6781c10-0f6b-4668-97e6-2788627aa7a4 , a1f13dfc-f9e4-46af-9e7a-1e64271f1691 |
Now, for each event of the first correlated stream, we can get the related event from our last stream using the operator CorrelateToSingle
. This operator, for each event of the first stream, tries to find the first event of the second stream that contains a correlation key that exists in the current correlation key list. The result is the following:
Value1 | Value2 | Value2Id | Correlation ids |
---|---|---|---|
1 | z | 45 | dafb56a0-7bfd-482f-8ed2-e47ded1abfe3 |
2 | z | 45 | 851267eb-50e0-47f9-b988-e17e75a458d2 |
3 | d | 69 | 8af7348a-c004-4c4d-90db-f6fa5213cabb |
4 | d | 69 | 4b1ac39f-89c0-426b-a1e2-07c63aebf938 |
5 | e | 13 | c6781c10-0f6b-4668-97e6-2788627aa7a4 |
6 | d | 69 | 338aabe0-3266-4c2d-9b0e-770b0c0b14dd |
7 | d | 69 | c131bb0b-97fa-4c63-86a5-ebcbeb0800f6 |
8 | e | 13 | a1f13dfc-f9e4-46af-9e7a-1e64271f1691 |
And now, this can be saved in the database as well by making reference to the foreign key of rows that were previously saved.
Practically
Below, the structure of the file to import:
column name | Description |
---|---|
title | Title of the blog post |
author | Author name of the post |
Email of the post author | |
timestamp | Date and time when the post was submitted |
category | Category of the post |
link | Only for post with a link: url of the link |
post | Only for a post with a text: text of the post |
The file would look like the following:
title,author,email,timestamp,category,link,post
FundProcess features,Stéphane Royer,stephane.royer@fundprocess.lu,20210109113005,Category 2,https://www.fundprocess.lu/features/,
ETL.NET revealed,Paillave,admroyer@hotmail.com,20210508181126,Category 2,,"This a post, about ETL.NET"
ETL.NET page,Paillave,admroyer@hotmail.com,20210504164510,Category 1,https://paillave.github.io/Etl.Net/,
FundProcess presentation,Stéphane Royer,stephane.royer@fundprocess.lu,20210203124051,Category 2,,"This a ""post"", about FundProcess"
FundProcess website,Stéphane Royer,stephane.royer@fundprocess.lu,20210106103005,Category 1,http://www.fundprocess.lu,
ETL.NET nuget,Paillave,admroyer@hotmail.com,20200504164510,Category 1,http://www.nuget.org/packages/Etl.Net,
ETL.NET information,Paillave,admroyer@hotmail.com,20200518071024,Category 3,,"This ""another post"" about ETL.NET"
FundProcess information,Stéphane Royer,stephane.royer@fundprocess.lu,20210819124550,Category 1,,This another post about FundProcess
The normalized structure where this file must be imported is this one:
classDiagram class Author { Id:int Email:string Name:string } class Category { Id:int Code:string Name:string } class Post { <<abstract>> Id:int DateTime:DateTime Title:string AuthorId:int CategoryId:int? } class LinkPost { Url:Uri } class TextPost { Text:string } Post --> Author Post --> Category LinkPost --|> Post TextPost --|> Post
The corresponding Entity Framework DbContext is this one:
using System;
using Microsoft.EntityFrameworkCore;
namespace BlogTutorial.DataAccess
{
public class SimpleDbContext : DbContext
{
private readonly string _connectionString = null;
public SimpleTutorialDbContext() { }
public SimpleTutorialDbContext(string connectionString) => _connectionString = connectionString;
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(_connectionString ?? @"Server=localhost,1433;Database=BlogTutorial;user=BlogTutorial;password=TestEtl.TestEtl;MultipleActiveResultSets=True");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var authorBuilder = modelBuilder.Entity<Author>();
authorBuilder.ToTable(nameof(Author));
authorBuilder.HasKey(i => i.Id);
authorBuilder.HasIndex(i => i.Email).IsUnique();
authorBuilder.Property(i => i.Id).UseIdentityColumn();
authorBuilder.Property(i => i.Name).IsRequired();
authorBuilder.Property(i => i.Email).HasMaxLength(250).IsRequired();
var categoryBuilder = modelBuilder.Entity<Category>();
categoryBuilder.ToTable(nameof(Category));
categoryBuilder.HasKey(i => i.Id);
categoryBuilder.HasIndex(i => i.Code).IsUnique();
categoryBuilder.Property(i => i.Id).UseIdentityColumn();
categoryBuilder.Property(i => i.Name).IsRequired();
categoryBuilder.Property(i => i.Code).IsRequired().HasMaxLength(20);
var postBuilder = modelBuilder.Entity<Post>();
postBuilder.ToTable(nameof(Post));
postBuilder.HasKey(i => i.Id);
postBuilder.HasIndex(i => new { i.AuthorId, i.DateTime }).IsUnique();
postBuilder.Property(i => i.Id).UseIdentityColumn();
postBuilder.HasOne(i => i.Author).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.AuthorId);
postBuilder.HasOne(i => i.Category).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.CategoryId);
var linkPostBuilder = modelBuilder.Entity<LinkPost>();
linkPostBuilder.HasBaseType<Post>();
linkPostBuilder.Property(i => i.Url).IsRequired().HasConversion(
uri => uri == null ? null : uri.ToString(),
value => string.IsNullOrWhiteSpace(value) ? null : new Uri(value));
var textPostBuilder = modelBuilder.Entity<TextPost>();
textPostBuilder.HasBaseType<Post>();
textPostBuilder.Property(i => i.Text).IsRequired();
var executionLogBuilder = modelBuilder.Entity<ExecutionLog>();
executionLogBuilder.ToTable(nameof(ExecutionLog));
executionLogBuilder.HasKey(i => i.Id);
executionLogBuilder.Property(i => i.Id).UseIdentityColumn();
executionLogBuilder.Property(i => i.EventType).HasMaxLength(250).IsRequired();
executionLogBuilder.Property(i => i.Message).IsRequired();
}
}
public class Author
{
public int Id { get; set; }
public string Email { get; set; }
public string Name { get; set; }
}
public class Category
{
public int Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
}
public abstract class Post
{
public int Id { get; set; }
public DateTime DateTime { get; set; }
public string Title { get; set; }
public int AuthorId { get; set; }
public Author Author { get; set; }
public int? CategoryId { get; set; }
public Category Category { get; set; }
}
public class LinkPost : Post
{
public Uri Url { get; set; }
}
public class TextPost : Post
{
public string Text { get; set; }
}
public class ExecutionLog
{
public int Id { get; set; }
public DateTime DateTime { get; set; }
public Guid ExecutionId { get; set; }
public string EventType { get; set; }
public string Message { get; set; }
}
}
Now, let's apply the theory in this practical situation:
private static void DefineProcess(ISingleStream<string> contextStream)
{
var rowStream = contextStream
.CrossApplyFolderFiles("list all required files", "*.csv", true)
.CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
{
Author = i.ToColumn("author"),
Email = i.ToColumn("email"),
TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
Category = i.ToColumn("category"),
Link = i.ToColumn("link"),
Post = i.ToColumn("post"),
Title = i.ToColumn("title"),
}).IsColumnSeparated(','))
.SetForCorrelation("set correlation for row");
var authorStream = rowStream
.Distinct("remove author duplicates based on emails", i => i.Email)
.Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
.EfCoreSave("save authors", o => o.SeekOn(i => i.Email).AlternativelySeekOn(i => i.Name));
var categoryStream = rowStream
.Distinct("remove category duplicates", i => i.Category)
.Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
.EfCoreSave("save categories", o => o.SeekOn(i => i.Code).DoNotUpdateIfExists());
var postStream = rowStream
.CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
.CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
.Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
? new LinkPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Url = new Uri(i.Row.Link)
} as Post
: new TextPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Text = i.Row.Post
})
.EfCoreSave("save posts", o => o.SeekOn(i => new { i.AuthorId, i.DateTime }));
}
note
It can happen sometimes that, depending on the situation, C# compiler cannot resolve properly which version of EfCoreSave
to use and takes the one for non correlated streams. If so, use EfCoreSaveCorrelated
instead to prevent the compiler to be confused.