Skip to main content

Handle traces

Behind the scenes every operator reports in detail its activity to the runtime. Then the runtime broadcast all the activity reporting to a trace stream.

The activity that is reported to the runtime to be issued into into the trace stream are the following

  • The end of the stream by giving the number of events that went thought it
  • If applicable, the exception that occurred in the operator
  • If UseDetailedTraces has been flagged, the actual payload/output of the operator is transmitted as well.

The process to apply to this trace stream can be given to the runtime when triggering the execution of the main process.

var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
var executionOptions = new ExecutionOptions<string>
{
TraceProcessDefinition = (ts, cs) => ts.Do("Show trace on console", t => Console.WriteLine(t.ToString()))
};
var res = await processRunner.ExecuteAsync(args[0], executionOptions);

Actually the trace process definition should be defined in a dedicated function:

using System.Threading.Tasks;
using Paillave.Etl.Core;

namespace Tutorial
{
class Program
{
static async Task Main(string[] args)
{
var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
var executionOptions = new ExecutionOptions<string>
{
TraceProcessDefinition = DefineTraceProcess
};
var res = await processRunner.ExecuteAsync(args[0], executionOptions);
}
private static void DefineTraceProcess(IStream<TraceEvent> traceStream, ISingleStream<string> contentStream)
{
// TODO: define how to process traces here
}
private static void DefineProcess(ISingleStream<string> contextStream)
{
// TODO: define the process here
}
}
}

The stream to handle traces emits TraceEvent payloads.

classDiagram class TraceEvent { SequenceId:int ExecutionId:Guid JobName:string DateTime:DateTime NodeName:string NodeTypeName:string } class ITraceContent { <<interface>> Type:string Message:string } class TraceLevel { <<enumeration>> Off Error Warning Info Verbose } class StreamTraceContentBase { <<abstract>> } class CounterSummaryStreamTraceContent { Level => TraceLevel.Info; Counter:int } class RowProcessStreamTraceContent { Level => TraceLevel.Verbose; Position:int AverageDuration:int? Row:object } class UnhandledExceptionStreamTraceContent { Level => TraceLevel.Error; Exception:Exception } TraceEvent-->ITraceContent:Content ITraceContent-->TraceLevel:Level StreamTraceContentBase--|>ITraceContent CounterSummaryStreamTraceContent--|>StreamTraceContentBase RowProcessStreamTraceContent--|>StreamTraceContentBase UnhandledExceptionStreamTraceContent--|>StreamTraceContentBase

To be able to receive events with a content of type RowProcessStreamTraceContent, the flag UseDetailedTraces must be set as for performance purposes, it is not emitted by default:

var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
var executionOptions = new ExecutionOptions<string>
{
TraceProcessDefinition = (ts, cs) => ts.Do("Show trace on console", t => Console.WriteLine(t.ToString())),
UseDetailedTraces = true
};
var res = await processRunner.ExecuteAsync(args[0], executionOptions);

It is possible to do whatever is possible in a trace process, even saving things in a database. As it can happen that the target database for logging is not the same than the working database. This can be the same for anything else that is injected: if the dependency injection context needs to be different than the one for the main process, it needs to be mentioned in TraceResolver.

var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
using (var cnx = new SqlConnection(args[1]))
using (var tracesCnx = new SqlConnection(args[2]))
{
cnx.Open();
var executionOptions = new ExecutionOptions<string>
{
TraceProcessDefinition = DefineTraceProcess
Resolver = new SimpleDependencyResolver().Register(cnx),
TraceResolver = new SimpleDependencyResolver().Register(tracesCnx),
};
var res = await processRunner.ExecuteAsync(args[0], executionOptions);
Console.Write(res.Failed ? "Failed" : "Succeeded");
}

A concrete example of trace process is described in the related tutorial.