Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more

Reply
nielsvdc
Advocate I
Advocate I

Experiments and parallel processing going wrong

We created a notebook to do some revenue predictions for locations using MLflow and pyspark. (Yes, later we might use pandas.)

The code is something like below, and forgive me if the code is not completely correct.

In the code you see that for each location we do 14 iterations to use the predicted revenue do finetune the predictions. This process works to our likings.

When we run this process using a foreach loop everything works fine.

What we want to do is use the ThreadPoolExecutor to do parallel processing of the predictions for locations and create an experiment per location to save the process. The problem that we run into is that we see predictions sometimes being saved to experiments of other locations and even runs being nested in runs of other locations. Does anyone know how to prevent this from happening?

 

import mlflow
from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.ml.pipeline import PipelineModel
from concurrent.futures import ThreadPoolExecutor

class LocationPrediction:
    def __init__(self, location_name, pipeline_model):
        self.location_name = location_name
        self.pipeline_model = pipeline_model
        self.df_with_predictions: DataFrame = None
        self.iteration = 0
        self.get_data_from_lakehouse()

    def get_data_from_lakehouse(self):
        self.initial_data = spark.read.format("delta").table("table_name").filter(f"location = '{self.location_name}'")

    def predict(self):
        # Start a child iteration run
        with mlflow.start_run(run_name=f"Iteration_{self.iteration}", nested=True):
            predictions = self.pipeline_model.transform(self.data)
            mlflow.log_metric("row_count", predictions.count())

        # ...
        # Do some stuff do dataframe result
        # ...
        self.df_with_predictions = predictions

    def write_to_lakehouse(self):
        self.df_with_predictions.write.format("delta").mode("append").saveAsTable("table_name")

    # Use new predictions to predict again
    def do_iteration(self):
        for i in range(14):
            self.predict()
            self.iteration += 1
        self.write_to_lakehouse()

def get_pipeline_model(location_name) -> PipelineModel:
    model_uri = f"models:/{location_name}/latest"
    model = mlflow.spark.load_model(model_uri)
    return model

def run_prediction_task(location_name):
    # Create or set Fabric experiment and start main run
    mlflow.set_experiment(location_name)
    run_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    mlflow.start_run(run_name=f"Prediction_{run_timestamp}")

    pipeline_model = get_pipeline_model(location_name)
    pipeline = LocationPrediction(location_name, pipeline_model)
    pipeline.do_iteration()

    mlflow.end_run()

if __name__ == "__main__":
    locations = ["location_1", "location_2", "location_3","location_4","location_5","location_6"]
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(run_prediction_task, location) for location in locations]

 

1 ACCEPTED SOLUTION
v-prasare
Community Support
Community Support

@nielsvdc

Thanks for reaching out to MS Fabric community support.

 

Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:

  1. Each location's prediction process is contained within a separate MLflow experiment.
  2. The nested runs (for each iteration) are correctly managed and don't interfere with other locations.
  3. No shared state is accessed concurrently in a way that could cause the wrong experiment or run to be affected.

Please follow below improvements:

  1. Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.

  2. Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.

  3. Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.

  4. DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.

  5. MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.

You can find more information on managing MLflow experiments and runs in the official documentation:

 

 

Thanks,

Prashanth Are

MS Fabric community support

View solution in original post

3 REPLIES 3
v-prasare
Community Support
Community Support

@nielsvdc

Thanks for reaching out to MS Fabric community support.

 

Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:

  1. Each location's prediction process is contained within a separate MLflow experiment.
  2. The nested runs (for each iteration) are correctly managed and don't interfere with other locations.
  3. No shared state is accessed concurrently in a way that could cause the wrong experiment or run to be affected.

Please follow below improvements:

  1. Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.

  2. Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.

  3. Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.

  4. DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.

  5. MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.

You can find more information on managing MLflow experiments and runs in the official documentation:

 

 

Thanks,

Prashanth Are

MS Fabric community support

Thanks @v-prasare. We are looking further into using NotebookUtils.notebook.RunMultiple(), as it gives us better insights in what the process was for each location in the notebook instance, by providing us a link to the notebook snaphots afterwards.

@nielsvdc, Thank you for the update!
It sounds like using NotebookUtils.notebook.RunMultiple() is a great choice, especially since it provides better insights and visibility into the process for each location in the notebook instance. The ability to access notebook snapshots afterwards will definitely help with tracking and debugging.

 

 

If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and give Kudos if helped you resolve your query

 

Thanks,

Prashanth Are

MS Fabric community support

 

 

 

 

Helpful resources

Announcements
MarchFBCvideo - carousel

Fabric Monthly Update - March 2025

Check out the March 2025 Fabric update to learn about new features.

March2025 Carousel

Fabric Community Update - March 2025

Find out what's new and trending in the Fabric community.

"); $(".slidesjs-pagination" ).prependTo(".pagination_sec"); $(".slidesjs-pagination" ).append("
"); $(".slidesjs-play.slidesjs-navigation").appendTo(".playpause_sec"); $(".slidesjs-stop.slidesjs-navigation").appendTo(".playpause_sec"); $(".slidesjs-pagination" ).append(""); $(".slidesjs-pagination" ).append(""); } catch(e){ } /* End: This code is added by iTalent as part of iTrack COMPL-455 */ $(".slidesjs-previous.slidesjs-navigation").attr('tabindex', '0'); $(".slidesjs-next.slidesjs-navigation").attr('tabindex', '0'); /* start: This code is added by iTalent as part of iTrack 1859082 */ $('.slidesjs-play.slidesjs-navigation').attr('id','playtitle'); $('.slidesjs-stop.slidesjs-navigation').attr('id','stoptitle'); $('.slidesjs-play.slidesjs-navigation').attr('role','tab'); $('.slidesjs-stop.slidesjs-navigation').attr('role','tab'); $('.slidesjs-play.slidesjs-navigation').attr('aria-describedby','tip1'); $('.slidesjs-stop.slidesjs-navigation').attr('aria-describedby','tip2'); /* End: This code is added by iTalent as part of iTrack 1859082 */ }); $(document).ready(function() { if($("#slides .item").length < 2 ) { /* Fixing Single Slide click issue (commented following code)*/ // $(".item").css("left","0px"); $(".item.slidesjs-slide").attr('style', 'left:0px !important'); $(".slidesjs-stop.slidesjs-navigation").trigger('click'); $(".slidesjs-previous").css("display", "none"); $(".slidesjs-next").css("display", "none"); } var items_length = $(".item.slidesjs-slide").length; $(".slidesjs-pagination-item > button").attr("aria-setsize",items_length); $(".slidesjs-next, .slidesjs-pagination-item button").attr("tabindex","-1"); $(".slidesjs-pagination-item button").attr("role", "tab"); $(".slidesjs-previous").attr("tabindex","-1"); $(".slidesjs-next").attr("aria-hidden","true"); $(".slidesjs-previous").attr("aria-hidden","true"); $(".slidesjs-next").attr("aria-label","Next"); $(".slidesjs-previous").attr("aria-label","Previous"); //$(".slidesjs-stop.slidesjs-navigation").attr("role","button"); //$(".slidesjs-play.slidesjs-navigation").attr("role","button"); $(".slidesjs-pagination").attr("role","tablist").attr("aria-busy","true"); $("li.slidesjs-pagination-item").attr("role","list"); $(".item.slidesjs-slide").attr("tabindex","-1"); $(".item.slidesjs-slide").attr("aria-label","item"); /*$(".slidesjs-stop.slidesjs-navigation").on('click', function() { var itemNumber = parseInt($('.slidesjs-pagination-item > a.active').attr('data-slidesjs-item')); $($('.item.slidesjs-slide')[itemNumber]).find('.c-call-to-action').attr('tabindex', '0'); });*/ $(".slidesjs-stop.slidesjs-navigation, .slidesjs-pagination-item > button").on('click keydown', function() { $.each($('.item.slidesjs-slide'),function(i,el){ $(el).find('.c-call-to-action').attr('tabindex', '-1'); }); var itemNumber = parseInt($('.slidesjs-pagination-item > button.active').attr('data-slidesjs-item')); $($('.item.slidesjs-slide')[itemNumber]).find('.c-call-to-action').attr('tabindex', '0'); }); $(".slidesjs-play.slidesjs-navigation").on('click', function() { $.each($('.item.slidesjs-slide'),function(i,el){ $(el).find('.c-call-to-action').attr('tabindex', '-1'); }); }); $(".slidesjs-pagination-item button").keyup(function(e){ var keyCode = e.keyCode || e.which; if (keyCode == 9) { e.preventDefault(); $(".slidesjs-stop.slidesjs-navigation").trigger('click').blur(); $("button.active").focus(); } }); $(".slidesjs-play").on("click",function (event) { if (event.handleObj.type === "click") { $(".slidesjs-stop").focus(); } else if(event.handleObj.type === "keydown"){ if (event.which === 13 && $(event.target).hasClass("slidesjs-play")) { $(".slidesjs-stop").focus(); } } }); $(".slidesjs-stop").on("click",function (event) { if (event.handleObj.type === "click") { $(".slidesjs-play").focus(); } else if(event.handleObj.type === "keydown"){ if (event.which === 13 && $(event.target).hasClass("slidesjs-stop")) { $(".slidesjs-play").focus(); } } }); $(".slidesjs-pagination-item").keydown(function(e){ switch (e.which){ case 37: //left arrow key $(".slidesjs-previous.slidesjs-navigation").trigger('click'); e.preventDefault(); break; case 39: //right arrow key $(".slidesjs-next.slidesjs-navigation").trigger('click'); e.preventDefault(); break; default: return; } $(".slidesjs-pagination-item button.active").focus(); }); }); // Start This code is added by iTalent as part of iTrack 1859082 $(document).ready(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); $("#tip2").attr("aria-hidden","true").addClass("hidden"); $(".slidesjs-stop.slidesjs-navigation, .slidesjs-play.slidesjs-navigation").attr('title', ''); $("a#playtitle").focus(function(){ $("#tip1").attr("aria-hidden","false").removeClass("hidden"); }); $("a#playtitle").mouseover(function(){ $("#tip1").attr("aria-hidden","false").removeClass("hidden"); }); $("a#playtitle").blur(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); }); $("a#playtitle").mouseleave(function(){ $("#tip1").attr("aria-hidden","true").addClass("hidden"); }); $("a#play").keydown(function(ev){ if (ev.which ==27) { $("#tip1").attr("aria-hidden","true").addClass("hidden"); ev.preventDefault(); return false; } }); $("a#stoptitle").focus(function(){ $("#tip2").attr("aria-hidden","false").removeClass("hidden"); }); $("a#stoptitle").mouseover(function(){ $("#tip2").attr("aria-hidden","false").removeClass("hidden"); }); $("a#stoptitle").blur(function(){ $("#tip2").attr("aria-hidden","true").addClass("hidden"); }); $("a#stoptitle").mouseleave(function(){ $("#tip2").attr("aria-hidden","true").addClass("hidden"); }); $("a#stoptitle").keydown(function(ev){ if (ev.which ==27) { $("#tip2").attr("aria-hidden","true").addClass("hidden"); ev.preventDefault(); return false; } }); }); // End This code is added by iTalent as part of iTrack 1859082
Top Solution Authors
Top Kudoed Authors