Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more

kustortininja

Sending Data Factory Telemetry to Fabric Real Time Intelligence

When configuring Data Factory pipelines through Fabric Data Factory, Azure Data Factory, or Synapse Data Pipelines, understanding performance and troubleshooting errors is crucial. Even if some observability features are available "out of the box," there will always be a need for deeper insights during development, testing, or production. Traditionally, this involved sending diagnostic data to Azure Storage, Azure Log Analytics, or Azure Event Hubs, which often required additional configuration and integration.

Fortunately, with Fabric Real Time Intelligence, gaining a richer understanding of our environment becomes much easier. This article demonstrates how to send telemetry data to Eventhouses in Microsoft Fabric, part of the Fabric Real Time Intelligence platform. By sending this data to an Eventhouse, we can create real-time dashboards and alerts, making integration simpler than ever before. The goal of this article is to show you how to send this information into Eventhouse. Note that in a real implementation, you can determine what and how much you want to track, from simple pipeline start/stop to grabbing the output from every activity in your pipeline.

 

Creating the Table

First, create an Eventhouse in Microsoft Fabric and then create a table called "RawDataFactoryEvents" with a single column of the dynamic data type:

 

.create table RawDataFactoryEvents (telemetry: dynamic)

 

Dynamic data types in KQL allow us to pass a dynamic object into the table and infer the schema at query time, providing flexibility to handle various schemas without specifying every column/data type combination upfront. This also protects the developer in the event the system schema changes from the software vendor (i.e., Microsoft).

Setting Up the Pipeline

Next, return to the pipeline and add a KQL activity. Connect it to the output of the copy activity, configure the connection to point to your created eventhouse, and update the command operation to append the data to the table:

 

kustortininja_0-1738724094278.png

 

 

.set-or-append RawDataFactoryEvents <|

    let pipeOutput = parse_json('@{activity('Copy data1').output}');

    print pipeOutput

 

This command uses Kusto data ingestion to load the data, appending the output of the copy activity to the RawDataFactoryEvents table.

Running the Pipeline

Running this pipeline will start sending telemetry information into the RawDataFactoryEvents table we created earlier. Next, we want to take the schema that's written and extract the fields we're looking to report on by leveraging Kusto's efficient operators for querying JSON. Once we have the query the way we like, we can turn this into an update policy, which runs when data is ingested into the table and automatically transforms this data into a more tabular-friendly structure. This makes the data more reporting-friendly. Here is some sample code that shows how this might work:

 

//start by creating the base query for the fields I want to see
RawDataFactoryEvents
| evaluate bag_unpack(telemetry)
| project copyDuration, errors, filesWritten, rowsCopied, rowsRead, executionDetails
| mv-expand executionDetails
| extend sourceType=executionDetails.source.type, sinkType=executionDetails.sink.type, Status=executionDetails.status, start=executionDetails.start, queueDuration=executionDetails.detailedDurations.queuingDuration, transferDuration=executionDetails.detailedDurations.transferDuration, transferDurationTimeToFirstByteFromSource=executionDetails.detailedDurations.timeToFirstByte
| project-away executionDetails

//make this a function so we can make it an update policy
.create-or-alter function tupDataFactoryCopyActivities() {
    RawDataFactoryEvents
    | evaluate bag_unpack(telemetry)
    | project copyDuration, errors, filesWritten, rowsCopied, rowsRead, executionDetails
    | mv-expand executionDetails
    | extend sourceType=executionDetails.source.type, sinkType=executionDetails.sink.type, Status=executionDetails.status, start=executionDetails.start, queueDuration=executionDetails.detailedDurations.queuingDuration, transferDuration=executionDetails.detailedDurations.transferDuration, transferDurationTimeToFirstByteFromSource=executionDetails.detailedDurations.timeToFirstByte
    | project-away executionDetails
    | project start=todatetime(start), Status=tostring(Status), sourceType=tostring(sourceType), sinkType=tostring(sinkType), rowsCopied=toint(rowsCopied), rowsRead=toint(rowsRead), filesWritten=toint(filesWritten), copyDuration=toint(copyDuration), queueDuration=toint(queueDuration), transferDuration=toint(transferDuration), transferDurationTimeToFirstByteFromSource=toint(transferDurationTimeToFirstByteFromSource), errors=todynamic(errors)
}

//create the target table where the formatted data will be stored
.create table DataFactoryCopyActivities (
    start: datetime,
    Status: string ,
    sourceType: string ,
    sinkType: string ,
    rowsCopied: int ,
    rowsRead: int  ,
    filesWritten: int,
    copyDuration: int,
    queueDuration: int, 
    transferDuration: int,
    transferDurationTimeToFirstByteFromSource: int,
    errors: dynamic
)

//attach the update policy to the table
.alter table DataFactoryCopyActivities policy update
```
[
    {
        "IsEnabled": true,
        "Source": "RawDataFactoryEvents",
        "Query": "tupDataFactoryCopyActivities()",
        "IsTransactional": false,
        "PropagateIngestionProperties": false
    }
]
```


//check the final table
DataFactoryCopyActivities

 

 

Leveraging the Data

The data is now available in its final form. With this data, we can build real-time dashboards to gain operational insights into our environment, always having the latest data available. Another option is to create Activator alerts that notify us when Tier 1, 2, and 3 pipelines fail. We can also leverage advanced features in KQL, such as multivariate anomaly detection, to find anomalies with our pipeline runs. Since this is all out-of-the-box functionality, it is much easier to implement than previous attempts, saving us significant effort and cost.

 

Learn More

Eager to learn more? Explore the full range of capabilities of Fabric Real Time Intelligence and discover how it can turn your data analytics estate event-driven.