Skip to main content

Normalize a flat structure

In traditional ETL, normalizing flat inputs (meaning dispatching one row into several tables, with one table that makes reference to the other one.s) is typically a duty that is far to be straight forward. This is extremely surprising to say the least, as this is one of the purposes of the "T" (Transform) in the acronym "ETL".

ETL.NET has a very clear and straight forward out of the box tools to handle this very usual pattern when it is about to import flat structures such as files. This mechanism is so straight forward that it seems almost magical!

Principle

This normalization is possible thanks to two features:

  • A way to know the original row/rows that are at the source of the current row
  • The capacity to make a Select/Update/Insert for each row to save with Paillave.EtlNet.EntityFrameworkCore or Paillave.EtlNet.SqlServer

The normalization pattern lie on the possibility to know what was the initial row or rows that permitted to result to a payload.

Behind the scenes, each row has a list of unique identifiers linked to it, and each operator is supposed to know how to combine these uids depending on its purpose. When a stream is issuing payloads that have a list of unique identifiers under the hood, it is called a correlated stream. Out of the box, streams are not correlated for performance and memory purpose. For a stream to be correlated, it must pass through a dedicated operator using SetForCorrelation. This operator simply attributes a list that contains a different single unique identifier to each row. Every operator of the core of ETL.NET knows how to handle correlated stream and issues a proper correlated stream regarding to its logic.

For example, let's imagine this sequence of events:

Value1Value2
1z
2z
3d
4d
5e
6d
7d
8e

Correlating this stream using SetForCorrelation would change it this way:

Value1Value2Correlation ids
1zdafb56a0-7bfd-482f-8ed2-e47ded1abfe3
2z851267eb-50e0-47f9-b988-e17e75a458d2
3d8af7348a-c004-4c4d-90db-f6fa5213cabb
4d4b1ac39f-89c0-426b-a1e2-07c63aebf938
5ec6781c10-0f6b-4668-97e6-2788627aa7a4
6d338aabe0-3266-4c2d-9b0e-770b0c0b14dd
7dc131bb0b-97fa-4c63-86a5-ebcbeb0800f6
8ea1f13dfc-f9e4-46af-9e7a-1e64271f1691

Making a Distinct based on Value2 would give this stream as a result:

Value1Value2Correlation ids
1zdafb56a0-7bfd-482f-8ed2-e47ded1abfe3, 851267eb-50e0-47f9-b988-e17e75a458d2
3d8af7348a-c004-4c4d-90db-f6fa5213cabb, 4b1ac39f-89c0-426b-a1e2-07c63aebf938, 338aabe0-3266-4c2d-9b0e-770b0c0b14dd, c131bb0b-97fa-4c63-86a5-ebcbeb0800f6
5ec6781c10-0f6b-4668-97e6-2788627aa7a4, a1f13dfc-f9e4-46af-9e7a-1e64271f1691

Then, let's save payloads of this stream based on the business key (Value2). As the save system gets the Id wether an occurrence already exists in the database or not, we get this result:

Value2IdCorrelation ids
z45dafb56a0-7bfd-482f-8ed2-e47ded1abfe3, 851267eb-50e0-47f9-b988-e17e75a458d2
d698af7348a-c004-4c4d-90db-f6fa5213cabb, 4b1ac39f-89c0-426b-a1e2-07c63aebf938, 338aabe0-3266-4c2d-9b0e-770b0c0b14dd, c131bb0b-97fa-4c63-86a5-ebcbeb0800f6
e13c6781c10-0f6b-4668-97e6-2788627aa7a4, a1f13dfc-f9e4-46af-9e7a-1e64271f1691

Now, for each event of the first correlated stream, we can get the related event from our last stream using the operator CorrelateToSingle. This operator, for each event of the first stream, tries to find the first event of the second stream that contains a correlation key that exists in the current correlation key list. The result is the following:

Value1Value2Value2IdCorrelation ids
1z45dafb56a0-7bfd-482f-8ed2-e47ded1abfe3
2z45851267eb-50e0-47f9-b988-e17e75a458d2
3d698af7348a-c004-4c4d-90db-f6fa5213cabb
4d694b1ac39f-89c0-426b-a1e2-07c63aebf938
5e13c6781c10-0f6b-4668-97e6-2788627aa7a4
6d69338aabe0-3266-4c2d-9b0e-770b0c0b14dd
7d69c131bb0b-97fa-4c63-86a5-ebcbeb0800f6
8e13a1f13dfc-f9e4-46af-9e7a-1e64271f1691

And now, this can be saved in the database as well by making reference to the foreign key of rows that were previously saved.

Practically

Below, the structure of the file to import:

column nameDescription
titleTitle of the blog post
authorAuthor name of the post
emailEmail of the post author
timestampDate and time when the post was submitted
categoryCategory of the post
linkOnly for post with a link: url of the link
postOnly for a post with a text: text of the post

The file would look like the following:

post.csv
title,author,email,timestamp,category,link,post
FundProcess features,Stéphane Royer,stephane.royer@fundprocess.lu,20210109113005,Category 2,https://www.fundprocess.lu/features/,
ETL.NET revealed,Paillave,admroyer@hotmail.com,20210508181126,Category 2,,"This a post, about ETL.NET"
ETL.NET page,Paillave,admroyer@hotmail.com,20210504164510,Category 1,https://paillave.github.io/Etl.Net/,
FundProcess presentation,Stéphane Royer,stephane.royer@fundprocess.lu,20210203124051,Category 2,,"This a ""post"", about FundProcess"
FundProcess website,Stéphane Royer,stephane.royer@fundprocess.lu,20210106103005,Category 1,http://www.fundprocess.lu,
ETL.NET nuget,Paillave,admroyer@hotmail.com,20200504164510,Category 1,http://www.nuget.org/packages/Etl.Net,
ETL.NET information,Paillave,admroyer@hotmail.com,20200518071024,Category 3,,"This ""another post"" about ETL.NET"
FundProcess information,Stéphane Royer,stephane.royer@fundprocess.lu,20210819124550,Category 1,,This another post about FundProcess

The normalized structure where this file must be imported is this one:

classDiagram class Author { Id:int Email:string Name:string } class Category { Id:int Code:string Name:string } class Post { <<abstract>> Id:int DateTime:DateTime Title:string AuthorId:int CategoryId:int? } class LinkPost { Url:Uri } class TextPost { Text:string } Post --> Author Post --> Category LinkPost --|> Post TextPost --|> Post

The corresponding Entity Framework DbContext is this one:

SimpleDbContext.cs
using System;
using Microsoft.EntityFrameworkCore;

namespace BlogTutorial.DataAccess
{
public class SimpleDbContext : DbContext
{
private readonly string _connectionString = null;
public SimpleTutorialDbContext() { }
public SimpleTutorialDbContext(string connectionString) => _connectionString = connectionString;
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(_connectionString ?? @"Server=localhost,1433;Database=BlogTutorial;user=BlogTutorial;password=TestEtl.TestEtl;MultipleActiveResultSets=True");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var authorBuilder = modelBuilder.Entity<Author>();
authorBuilder.ToTable(nameof(Author));
authorBuilder.HasKey(i => i.Id);
authorBuilder.HasIndex(i => i.Email).IsUnique();
authorBuilder.Property(i => i.Id).UseIdentityColumn();
authorBuilder.Property(i => i.Name).IsRequired();
authorBuilder.Property(i => i.Email).HasMaxLength(250).IsRequired();

var categoryBuilder = modelBuilder.Entity<Category>();
categoryBuilder.ToTable(nameof(Category));
categoryBuilder.HasKey(i => i.Id);
categoryBuilder.HasIndex(i => i.Code).IsUnique();
categoryBuilder.Property(i => i.Id).UseIdentityColumn();
categoryBuilder.Property(i => i.Name).IsRequired();
categoryBuilder.Property(i => i.Code).IsRequired().HasMaxLength(20);

var postBuilder = modelBuilder.Entity<Post>();
postBuilder.ToTable(nameof(Post));
postBuilder.HasKey(i => i.Id);
postBuilder.HasIndex(i => new { i.AuthorId, i.DateTime }).IsUnique();
postBuilder.Property(i => i.Id).UseIdentityColumn();
postBuilder.HasOne(i => i.Author).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.AuthorId);
postBuilder.HasOne(i => i.Category).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.CategoryId);

var linkPostBuilder = modelBuilder.Entity<LinkPost>();
linkPostBuilder.HasBaseType<Post>();
linkPostBuilder.Property(i => i.Url).IsRequired().HasConversion(
uri => uri == null ? null : uri.ToString(),
value => string.IsNullOrWhiteSpace(value) ? null : new Uri(value));

var textPostBuilder = modelBuilder.Entity<TextPost>();
textPostBuilder.HasBaseType<Post>();
textPostBuilder.Property(i => i.Text).IsRequired();

var executionLogBuilder = modelBuilder.Entity<ExecutionLog>();
executionLogBuilder.ToTable(nameof(ExecutionLog));
executionLogBuilder.HasKey(i => i.Id);
executionLogBuilder.Property(i => i.Id).UseIdentityColumn();
executionLogBuilder.Property(i => i.EventType).HasMaxLength(250).IsRequired();
executionLogBuilder.Property(i => i.Message).IsRequired();
}
}
public class Author
{
public int Id { get; set; }
public string Email { get; set; }
public string Name { get; set; }
}
public class Category
{
public int Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
}
public abstract class Post
{
public int Id { get; set; }
public DateTime DateTime { get; set; }
public string Title { get; set; }
public int AuthorId { get; set; }
public Author Author { get; set; }
public int? CategoryId { get; set; }
public Category Category { get; set; }
}
public class LinkPost : Post
{
public Uri Url { get; set; }
}
public class TextPost : Post
{
public string Text { get; set; }
}
public class ExecutionLog
{
public int Id { get; set; }
public DateTime DateTime { get; set; }
public Guid ExecutionId { get; set; }
public string EventType { get; set; }
public string Message { get; set; }
}
}

Now, let's apply the theory in this practical situation:

private static void DefineProcess(ISingleStream<string> contextStream)
{
var rowStream = contextStream
.CrossApplyFolderFiles("list all required files", "*.csv", true)
.CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
{
Author = i.ToColumn("author"),
Email = i.ToColumn("email"),
TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
Category = i.ToColumn("category"),
Link = i.ToColumn("link"),
Post = i.ToColumn("post"),
Title = i.ToColumn("title"),
}).IsColumnSeparated(','))
.SetForCorrelation("set correlation for row");

var authorStream = rowStream
.Distinct("remove author duplicates based on emails", i => i.Email)
.Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
.EfCoreSave("save authors", o => o.SeekOn(i => i.Email).AlternativelySeekOn(i => i.Name));

var categoryStream = rowStream
.Distinct("remove category duplicates", i => i.Category)
.Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
.EfCoreSave("save categories", o => o.SeekOn(i => i.Code).DoNotUpdateIfExists());

var postStream = rowStream
.CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
.CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
.Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
? new LinkPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Url = new Uri(i.Row.Link)
} as Post
: new TextPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Text = i.Row.Post
})
.EfCoreSave("save posts", o => o.SeekOn(i => new { i.AuthorId, i.DateTime }));
}
note

It can happen sometimes that, depending on the situation, C# compiler cannot resolve properly which version of EfCoreSave to use and takes the one for non correlated streams. If so, use EfCoreSaveCorrelated instead to prevent the compiler to be confused.