Skip to main content
</>{JSON}

Data processing

ETL.NET is a framework for .NET to implement with no effort fast, low memory impact and easy to maintain data processes. All the tooling for normalization, upsert, lookup or join dramatically reduces the effort for any import and transformation purpose. Everything to handle tracing, error tracking is done automatically for the developer.

Read or write any file type and any data source

  • Native SQL server
  • Entity Framework
  • CSV
  • Excel
  • Bloomberg response files
  • Searchable PDF
  • XML
  • Anything .NET can read or write whatsoever

Read or write files on any source

  • File system
  • FTP
  • SFTP
  • FTPS
  • Dropbox
  • eMail and MailBox
  • zip archives
  • Anything .NET can access whatsoever

Powered by & for .NET

ETL.NET is fully written in .NET for a multi platform usage and for a straight forward integration in any application. Extend it takes 5mn... literally.

Easy to implement

ETL.NET works with a similar principle than SSIS, with ETL processes to be written in .NET like Linq queries.

Easy to run

A simple and straight forward ELT.NET runtime for .NET executes ETL processes with no installation required.

Very Quick Start ⚡

Create the project

dotnet new console -o MyFirstEtl
cd MyFirstEtl
dotnet add package Paillave.EtlNet.Core
dotnet add package Paillave.EtlNet.FileSystem
dotnet add package Paillave.EtlNet.Zip
dotnet add package Paillave.EtlNet.TextFile
dotnet add package Paillave.EtlNet.SqlServer

Create and call the ETL

using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using System.Linq;
namespace SimpleTutorial
{
class Program
{
static async Task Main(string[] args)
{
var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
using (var cnx = new SqlConnection(args[1]))
{
cnx.Open();
var executionOptions = new ExecutionOptions<string> { Resolver = new SimpleDependencyResolver().Register(cnx) };
var res = await processRunner.ExecuteAsync(args[0], executionOptions);
Console.Write(res.Failed ? "Failed" : "Succeeded");
if (res.Failed)
Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
}
}
private static void DefineProcess(ISingleStream<string> contextStream)
{
contextStream
.CrossApplyFolderFiles("list all required files", "*.zip", true)
.CrossApplyZipFiles("extract files from zip", "*.csv")
.CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new Person
{
Email = i.ToColumn("email"),
FirstName = i.ToColumn("first name"),
LastName = i.ToColumn("last name"),
DateOfBirth = i.ToDateColumn("date of birth", "yyyy-MM-dd"),
Reputation = i.ToNumberColumn<int?>("reputation", ".")
}).IsColumnSeparated(','))
.Distinct("exclude duplicates based on the Email", i => i.Email)
.SqlServerSave("upsert using Email as key and ignore the Id", o => o
.ToTable("dbo.Person")
.SeekOn(p => p.Email)
.DoNotSave(p => p.Id))
.Select("define row to report", i => new { i.Email, i.Id })
.ToTextFileValue("write summary to file", "report.csv", FlatFileDefinition.Create(i => new
{
Email = i.ToColumn("Email"),
Id = i.ToNumberColumn<int>("new or existing Id", ".")
}).IsColumnSeparated(','))
.WriteToFile("save log file", i => i.Name);
}
private class Person
{
public int Id { get; set; }
public string Email { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTime DateOfBirth { get; set; }
public int? Reputation { get; set; }
}
}
}

Run it, debug it, track it, log it 🎶

Execute an ETL process, debug it by tracking debug events using the IDE debugger, catch execution events and log it into database.

using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
namespace SimpleTutorial
{
class Program
{
static async Task Main(string[] args)
{
var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
processRunner.DebugNodeStream += (sender, e) => { /* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG */ };
using (var cnx = new SqlConnection(args[1]))
{
cnx.Open();
var executionOptions = new ExecutionOptions<string>
{
Resolver = new SimpleDependencyResolver().Register(cnx),
TraceProcessDefinition = DefineTraceProcess,
// UseDetailedTraces = true // activate only if per row traces are meant to be caught
};
var res = await processRunner.ExecuteAsync(args[0], executionOptions);
Console.Write(res.Failed ? "Failed" : "Succeeded");
if (res.Failed)
Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
}
}
private static void DefineProcess(ISingleStream<string> contextStream)
{
// TODO: define your ELT process here
}
private static void DefineTraceProcess(IStream<TraceEvent> traceStream, ISingleStream<string> contentStream)
{
traceStream
.Where("keep only summary of node and errors", i => i.Content is CounterSummaryStreamTraceContent || i.Content is UnhandledExceptionStreamTraceContent)
.Select("create log entry", i => new ExecutionLog
{
DateTime = i.DateTime,
ExecutionId = i.ExecutionId,
EventType = i.Content switch
{
CounterSummaryStreamTraceContent => "EndOfNode",
UnhandledExceptionStreamTraceContent => "Error",
_ => "Unknown"
},
Message = i.Content switch
{
CounterSummaryStreamTraceContent counterSummary => $"{i.NodeName}: {counterSummary.Counter}",
UnhandledExceptionStreamTraceContent unhandledException => $"{i.NodeName}({i.NodeTypeName}): [{unhandledException.Level.ToString()}] {unhandledException.Message}",
_ => "Unknown"
}
})
.SqlServerSave("save traces", o => o.ToTable("dbo.ExecutionTrace"));
}
private class ExecutionLog
{
public DateTime DateTime { get; set; }
public Guid ExecutionId { get; set; }
public string EventType { get; set; }
public string Message { get; set; }
}
}
}

Normalize it 🎶

Dispatch rows from a flat file into several tables to normalize data thanks to the correlation mechanism.

private static void DefineProcess(ISingleStream<string> contextStream)
{
var rowStream = contextStream
.CrossApplyFolderFiles("list all required files", "*.csv", true)
.CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
{
Author = i.ToColumn("author"),
Email = i.ToColumn("email"),
TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
Category = i.ToColumn("category"),
Link = i.ToColumn("link"),
Post = i.ToColumn("post"),
Title = i.ToColumn("title"),
}).IsColumnSeparated(','))
.SetForCorrelation("set correlation for row");
var authorStream = rowStream
.Distinct("remove author duplicates based on emails", i => i.Email)
.Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
.EfCoreSaveCorrelated("save or update authors", o => o
.SeekOn(i => i.Email)
.AlternativelySeekOn(i => i.Name));
var categoryStream = rowStream
.Distinct("remove category duplicates", i => i.Category)
.Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
.EfCoreSaveCorrelated("insert categories if doesn't exist, get it otherwise", o => o
.SeekOn(i => i.Code)
.DoNotUpdateIfExists());
var postStream = rowStream
.CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
.CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
.Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
? new LinkPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Url = new Uri(i.Row.Link)
} as Post
: new TextPost
{
AuthorId = i.Author.Id,
CategoryId = i.Category.Id,
DateTime = i.Row.TimeSpan,
Title = i.Row.Title,
Text = i.Row.Post
})
.EfCoreSaveCorrelated("save or update posts", o => o
.SeekOn(i => new { i.AuthorId, i.DateTime }));
}