Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more

Reply
MiSchroe
Frequent Visitor

Performance issue with Spark structured streaming job

Hi,

 

we face some performance issues when running a Spark Job Definition with a structured streaming job getting data from a Kafka. We just want to dump the Kafka data into a Lakehouse for further processing in later steps.

 

Our issue is, that we have an average latency of around 30 seconds between the event being visible to Kafka and the time the event is picked up by the Spark job. I seems that the overall micro-batch processing time is the main issue and I don't know why it takes so long. I have already tried with different cluster sizes but due to the fact that we're just passing through the data the cluster size doesn't really matter. At least, the number of vCores should match the number of Kafka partitions, which in our case is 6 partitions.

 

I took the same source code and have run it in a Databricks notebook where I get an average latency of 0.6 seconds, so it's not an issue on the source side.

 

Here is our source code:

 

import sys
import os
from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as f
import pyspark.sql.types as t


if __name__ == "__main__":

    #Spark session builder
    #Disable delta.optimizeWrite to ensure data is written as fast as possible
    spark = (SparkSession
          .builder
          .appName("test")
          .config("spark.microsoft.delta.optimizeWrite.enabled", "false")
          .config("spark.sql.parquet.vorder.enabled", "false")
          .config("spark.ms.autotune.enabled", "true")
          .getOrCreate())
    
    spark_context = spark.sparkContext
    spark_context.setLogLevel("DEBUG")



    print("spark.synapse.pool.name : " + spark.conf.get("spark.synapse.pool.name")) 
    print() 
    print("spark.driver.cores : " + spark.conf.get("spark.driver.cores")) 
    print("spark.driver.memory : " + spark.conf.get("spark.driver.memory")) 
    print("spark.executor.cores : " + spark.conf.get("spark.executor.cores")) 
    print("spark.executor.memory : " + spark.conf.get("spark.executor.memory")) 
    print("spark.executor.instances: " + spark.conf.get("spark.executor.instances")) 
    print() 
    print("spark.dynamicAllocation.enabled : " + spark.conf.get("spark.dynamicAllocation.enabled")) 
    print("spark.dynamicAllocation.maxExecutors : " + spark.conf.get("spark.dynamicAllocation.maxExecutors")) 
    print("spark.dynamicAllocation.minExecutors : " + spark.conf.get("spark.dynamicAllocation.minExecutors")) 

    checkpoint_path = "Files/checkpoint/path/to/checkpoint/location"

    # Works with API-Key und Secret
    df = (
        spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "<my kafka server>:9092")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format("<my kafka api key>", "<my kafka api secret>"))
        .option("kafka.ssl.endpoint.identification.algorithm", "https")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("subscribe", "<my kafka topic>")
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .load()
    )

    # Add metadata to the dataframe, e.g. current timestamp and convert key and value to string
    df = df.withColumns({"key": f.col("key").cast("string"), "value": f.col("value").cast("string"), "timestamp_datalake": f.current_timestamp()})

    df_stream = (
        df.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .outputMode("append")
        .toTable("my_test_table")
    )
    
    df_stream.awaitTermination()

 

 

This is a sample of the Spark UI:

MiSchroe_0-1722251930376.png

BTW: Importing the same data with an Event Streams from Kafka to a KQL database takes even longer ~40 seconds avg latency.

 

Any help is much appreciated.

 

Michael

2 REPLIES 2
MiSchroe
Frequent Visitor

Hi @v-huijiey-msft,

 

thanks for your answer.

 

I have implemented your suggestions and here are the results:

  1. Spark executor heartbeat -> no change.
  2. Increased driver and 3 executors to X-large nodes with each 32 cores and 224GB memory. -> no change.
  3. Increase the number of Kafka partitions: As it is an external system I can't change that one. There are 6 partitions on that Kafka topic. (And it runs perfectly with this setup in Databricks, so the number of partitions is not an issue here.)
  4. Resize the executor: see #2 -> no change.

The topic produces messages at a rate of ~10 messages per second with each meassage ~3kB in size. Compared to what is mentioned in the linked blog entry this is a drop in the ocean.

 

Regards,

Michael

v-huijiey-msft
Community Support
Community Support

Hi @MiSchroe ,

 

Given that the same code runs efficiently on Databricks, the problem may be in the configuration or environment of the current setup.

 

I have the following suggestions for performance tuning:

 

It is recommended to increase spark.executor.heartbeatInterval from 10 seconds to 20 seconds.

 

Increase driver and executor memory.

 

Increase parallelism: increase the number of partitions in Kafka.

 

Correctly resize the executor.

 

More details on performance tuning can be found in this article:

Performance Tuning of an Apache Kafka/Spark Streaming System | HPE Developer Portal

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Helpful resources

Announcements
MarchFBCvideo - carousel

Fabric Monthly Update - March 2025

Check out the March 2025 Fabric update to learn about new features.

Notebook Gallery Carousel1

NEW! Community Notebooks Gallery

Explore and share Fabric Notebooks to boost Power BI insights in the new community notebooks gallery.

April2025 Carousel

Fabric Community Update - April 2025

Find out what's new and trending in the Fabric community.

"); $(".slidesjs-pagination" ).prependTo(".pagination_sec"); $(".slidesjs-pagination" ).append("
"); $(".slidesjs-play.slidesjs-navigation").appendTo(".playpause_sec"); $(".slidesjs-stop.slidesjs-navigation").appendTo(".playpause_sec"); $(".slidesjs-pagination" ).append(""); $(".slidesjs-pagination" ).append(""); } catch(e){ } /* End: This code is added by iTalent as part of iTrack COMPL-455 */ $(".slidesjs-previous.slidesjs-navigation").attr('tabindex', '0'); $(".slidesjs-next.slidesjs-navigation").attr('tabindex', '0'); /* start: This code is added by iTalent as part of iTrack 1859082 */ $('.slidesjs-play.slidesjs-navigation').attr('id','playtitle'); $('.slidesjs-stop.slidesjs-navigation').attr('id','stoptitle'); $('.slidesjs-play.slidesjs-navigation').attr('role','tab'); $('.slidesjs-stop.slidesjs-navigation').attr('role','tab'); $('.slidesjs-play.slidesjs-navigation').attr('aria-describedby','tip1'); $('.slidesjs-stop.slidesjs-navigation').attr('aria-describedby','tip2'); /* End: This code is added by iTalent as part of iTrack 1859082 */ }); $(document).ready(function() { if($("#slides .item").length < 2 ) { /* Fixing Single Slide click issue (commented following code)*/ // $(".item").css("left","0px"); $(".item.slidesjs-slide").attr('style', 'left:0px !important'); $(".slidesjs-stop.slidesjs-navigation").trigger('click'); $(".slidesjs-previous").css("display", "none"); $(".slidesjs-next").css("display", "none"); } var items_length = $(".item.slidesjs-slide").length; $(".slidesjs-pagination-item > button").attr("aria-setsize",items_length); $(".slidesjs-next, .slidesjs-pagination-item button").attr("tabindex","-1"); $(".slidesjs-pagination-item button").attr("role", "tab"); $(".slidesjs-previous").attr("tabindex","-1"); $(".slidesjs-next").attr("aria-hidden","true"); $(".slidesjs-previous").attr("aria-hidden","true"); $(".slidesjs-next").attr("aria-label","Next"); $(".slidesjs-previous").attr("aria-label","Previous"); //$(".slidesjs-stop.slidesjs-navigation").attr("role","button"); //$(".slidesjs-play.slidesjs-navigation").attr("role","button"); $(".slidesjs-pagination").attr("role","tablist").attr("aria-busy","true"); $("li.slidesjs-pagination-item").attr("role","list"); $(".item.slidesjs-slide").attr("tabindex","-1"); $(".item.slidesjs-slide").attr("aria-label","item"); /*$(".slidesjs-stop.slidesjs-navigation").on('click', function() { var itemNumber = parseInt($('.slidesjs-pagination-item > a.active').attr('data-slidesjs-item')); $($('.item.slidesjs-slide')[itemNumber]).find('.c-call-to-action').attr('tabindex', '0'); });*/ $(".slidesjs-stop.slidesjs-navigation, .slidesjs-pagination-item > button").on('click keydown', function() { $.each($('.item.slidesjs-slide'),function(i,el){ $(el).find('.c-call-to-action').attr('tabindex', '-1'); }); var itemNumber = parseInt($('.slidesjs-pagination-item > button.active').attr('data-slidesjs-item')); $($('.item.slidesjs-slide')[itemNumber]).find('.c-call-to-action').attr('tabindex', '0'); }); $(".slidesjs-play.slidesjs-navigation").on('click', function() { $.each($('.item.slidesjs-slide'),function(i,el){ $(el).find('.c-call-to-action').attr('tabindex', '-1'); }); }); $(".slidesjs-pagination-item button").keyup(function(e){ var keyCode = e.keyCode || e.which; if (keyCode == 9) { e.preventDefault(); $(".slidesjs-stop.slidesjs-navigation").trigger('click').blur(); $("button.active").focus(); } }); $(".slidesjs-play").on("click",function (event) { if (event.handleObj.type === "click") { $(".slidesjs-stop").focus(); } else if(event.handleObj.type === "keydown"){ if (event.which === 13 && $(event.target).hasClass("slidesjs-play")) { $(".slidesjs-stop").focus(); } } }); $(".slidesjs-stop").on("click",function (event) { if (event.handleObj.type === "click") { $(".slidesjs-play").focus(); } else if(event.handleObj.type === "keydown"){ if (event.which === 13 && $(event.target).hasClass("slidesjs-stop")) { $(".slidesjs-play").focus(); } } }); $(".slidesjs-pagination-item").keydown(function(e){ switch (e.which){ case 37: //left arrow key $(".slidesjs-previous.slidesjs-navigation").trigger('click'); e.preventDefault(); break; case 39: //right arrow key $(".slidesjs-next.slidesjs-navigation").trigger('click'); e.preventDefault(); break; default: return; } $(".slidesjs-pagination-item button.active").focus(); }); }); // Start This code is added by iTalent as part of iTrack 1859082 $(document).ready(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); $("#tip2").attr("aria-hidden","true").addClass("hidden"); $(".slidesjs-stop.slidesjs-navigation, .slidesjs-play.slidesjs-navigation").attr('title', ''); $("a#playtitle").focus(function(){ $("#tip1").attr("aria-hidden","false").removeClass("hidden"); }); $("a#playtitle").mouseover(function(){ $("#tip1").attr("aria-hidden","false").removeClass("hidden"); }); $("a#playtitle").blur(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); }); $("a#playtitle").mouseleave(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); }); $("a#play").keydown(function(ev){ if (ev.which ==27) { $("#tip1").attr("aria-hidden","true").addClass("hidden"); ev.preventDefault(); return false; } }); $("a#stoptitle").focus(function(){ $("#tip2").attr("aria-hidden","false").removeClass("hidden"); }); $("a#stoptitle").mouseover(function(){ $("#tip2").attr("aria-hidden","false").removeClass("hidden"); }); $("a#stoptitle").blur(function(){ $("#tip2").attr("aria-hidden","true").addClass("hidden"); }); $("a#stoptitle").mouseleave(function(){ $("#tip2").attr("aria-hidden","true").addClass("hidden"); }); $("a#stoptitle").keydown(function(ev){ if (ev.which ==27) { $("#tip2").attr("aria-hidden","true").addClass("hidden"); ev.preventDefault(); return false; } }); }); // End This code is added by iTalent as part of iTrack 1859082