Series 1: Hotstar Autoscaler built with Azure OpenAI

We have all seen enough posts about Game Changer ๐Ÿ˜… plugins built on top of the GPT models. I can assure you this won’t be just another one of those. Join with me in this series to learn about some cool stuffs that you can built using these models in your real-time applications. ( Definitely not a Game Changer! ๐Ÿ˜‰ ).

The objective of this blog is to

  • Stimulate fresh ideas by exploring the logical and reasoning capabilities of GPT models with Azure OpenAI.
  • Ensure anyone from a development background should be able to follow along the entire series. (I’m not a Data Scientist myself๐Ÿ˜‰ )

I assume most of us would have watched this video, where Gaurav Kamboj, Cloud Architect at Hotstar, explains why traditional autoscaling doesnโ€™t work for Hotstar. and shared with us how Hotstar created a global record for live streaming to 25.3 million concurrent viewers.

Well this is gonna be our problem statement for this entire series. While their is no doubt that hotstar’s custom autoscaler is perfectly working fine, our objective here is to expand upon the problem they initially addressed and explore how GPT models could offer innovative solutions for these unique use cases.

  • Trafic based scaling for those services that exposes metrics about number of requests being processed
  • Ladder based scaling for those services that didn’t expose these metrics

Hotstar defined those scaling ladder configurations as below,

For more details refer to this blog : https://blog.hotstar.com/scaling-for-tsunami-traffic-2ec290c37504

Some of the key observations in the above chart

  • Unusual spikes happens based on the sentiments of the viewers
  • On day two, when Dhoni came to bat it raisd live users concurrency from 16M to 25M+ users
  • After the fall of his wicket , there is a sudden drop in the traffic ( 25M to 1M users)

What if these sentiments are tracked by our GPT models and sent a signal to increase or decrease the instance of services.

Let’s understand different components of this archiecture

  • For simplicity, I have considered 3 services for Hotstar application.
    • Commentary Service : For simulating the commentary in a gap of 1 minute for each ball, I have created a Azure Durable function which picks each commentary in a gap of 1 minute and push those messages to a Azure service bus queue
    • Video service : Video service is for streaming the live game and for that I have created a Virtual machine scale set. Steady and spikes of traffic is common in this service as it is based on the sentiment of the game.
    • Recommendation service : Same as video service , we have virtual machine scale sets for recommendation service, that gets triggered when everyone tries to press back button or come to home screen after a key player wicket has been taken
  • Azure Cosmos DB : I have created 2 containers ipl_match to hold the ipl_match and ipl_scaling_ladder to hold the scaling ladder configuration
  • Azure Prompt flow: We deploy the prompt flow as a real-time endpoint and consumed in Azure Logic app
  • Azure Logic app acts as a Prompt flow invoker and based on the output dynamically changes the instance count of video service and recommendation service

We have loaded the match details for Ind vs NZ in ipl_match container. It contains the Playing XII details along with key players defined for this particular match which is then used in our prompt. This container also contains the number of concurrent users

  1. First we will simulate the commentary of the match through our Commentary Service (Azure Durable Functions) that pushes the message to Azure service bus queue at a gap of 40 sec interval.
  2. Next we have Azure Logic app , that listens to the Azure service bus Queue at a 1 sec interval. It parses the message content in the queue and gets matchId & commentary.
  3. Now the parsed output is passed as an input to our Azure Prompt flow via a HTTP post call.
  4. MatchId is then passed onto a python code
    4.1. It is used to fetch the match details from Azure CosmosDB
  5. Concurrent users from the previous step is then passed onto the next python code
    5.1. It is now used to fetch the scaling ladder configurations from Azure CosmosDB
  6. Now we make use of all inputs from previous 3 steps to form the system prompt.
  7. GPT model act upon the prompt that we provided and outputs the JSON in the requested fomrat. It is then received by Azure Logic app
  8. & 9. Based on the output, we either increase/decrease the instance count of virtual machine scale sets in Video and Recommendation service.

Pre-requisites

  • Azure subscription
  • Azure OpenAI subscription
  • Azure Machine Learning studio
  • click on create button in the flows and select standard flow
  • Create connections for OpenAI and Cosmos as below
  • Connections for Azure OpenAI is predefined in the connectors, however for Cosmos connection choose Custom.
  • Now we need to custom environments for installing Azure Cosmos DB dependencies in the compute instance
  • Provide a name for custom environment and Select Create a new docker context in the environment source dropdown
  • In the next wizard, create as below
  • Click Create button in the final wizard
  • Once the custom environment is created , make a note of Azure container registry path.
  • Now Create the environment.yaml file as below
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
$schema: https://azuremlschemas.azureedge.net/latest/environment.schema.json
name: env-hoststar-autoscaler
image: d7a6feac73964dcba70d0a9ab016d8a8.azurecr.io/azureml/azureml_d28214de704a0e200df331e553a4f8c9
inference_config:
  liveness_route:
    port: 8080
    path: /health
  readiness_route:
    port: 8080
    path: /health
  scoring_route:
    port: 8080
    path: /score
  • Now create a new environment using Azure CLI command
1
az ml environment create --file my_env.yml --resource-group my-resource-group --workspace-name my-workspace
  • Create a compute instance which is required for creating a Runtime in Prmopt Flow.
  • Now next step is to create a Runtime on top of the custom environment we created via Azure Cli commmand.
  • Choose the runtime created in the previous step in the Flows
  • Create 2 parameters for Inputs and output as below
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from promptflow import tool
from promptflow.connections import CustomConnection
from azure.cosmos import CosmosClient
import json


@tool
# Function to fetch and extract required data
def get_match_data(matchId : str, conn: CustomConnection):
    # Initialize Cosmos DB client
    client = CosmosClient(url=conn.endpoint, credential=conn.key)
    database = client.get_database_client(conn.database_name)
    container = database.get_container_client(conn.container_name)

    query = f"SELECT * FROM c WHERE c.id = '{matchId}'"
    result = list(container.query_items(query, enable_cross_partition_query=True))
    print(result)
    if result:
        match_data = result[0]
        required_data = {
            "concurrentUsers": match_data["concurrentUsers"],
            "teams": match_data["teams"]
        }
        return required_data
    else:
        return None

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from promptflow import tool
from promptflow.connections import CustomConnection
from azure.cosmos import CosmosClient
import json


@tool
# Function to fetch and extract required data
def get_scaling_ladder(concurrentUsers : str, conn: CustomConnection):
    # Initialize Cosmos DB client
    client = CosmosClient(url=conn.endpoint, credential=conn.key)
    database = client.get_database_client(conn.database_name)
    container = database.get_container_client('ipl-scaling-ladder')

    query = f"SELECT * FROM c WHERE c.concurrency = '{concurrentUsers}'"
    result = list(container.query_items(query, enable_cross_partition_query=True))
    print(result)
    if result:
        scaling_ladder = result[0]
        required_data = {
            "concurrentUsers": scaling_ladder["concurrency"],
            "services": scaling_ladder["services"]
        }
        return required_data
    else:
        return None
  • In the system prompt we set the behavior of the model
  • Once it is done we break down 3 parts
    1. We will define the output JSON format
    2. We will loop through the key players of the match from both the teams
    3. We will provide recommendations for GPT model to follow
  • Based on the prompt, model was able to provide almost the right suggestions. Except there is a mismatch in the flag of video service increase instance.
  • We can however fine-tune the model to provide more acurate results. I will try to cover this in the next blog of this series.
  • Chose deploy button, it will popup a wizard
  • In the first step, provide endpoint name, authentication type (key-based) and click on next button
  • In the next step chose the output to include in the endpoint response
  • After that verify the connection details provided in the Azure prompt flow, if needed modify the connections based on specific environments.
  • Chose a comput instance
  • Click on Deploy button
  • Add AzureML Data scientist RBAC role to the endpoint created for accessing the workspace
  • Make a note of key and the url for us to invoke it from Azure Logic apps
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;

namespace Hotstar.Autoscaler
{
    public static class CommentaryOrchestration
    {
        [FunctionName("Commentary_SchedulerJob")]
        public static async Task CommentarySchedulerJob(
            [TimerTrigger("0 40 02 * * *")] TimerInfo timer,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            log.LogInformation("CommentaryScheduler Cron job fired!");

            string instanceId = await starter.StartNewAsync(nameof(ProcessCommentaryOrchestration));
            log.LogInformation($"Started new instance with ID = {instanceId}.");

            DurableOrchestrationStatus status;
            while (true)
            {
                status = await starter.GetStatusAsync(instanceId);
                log.LogInformation($"Status: {status.RuntimeStatus}, Last update: {status.LastUpdatedTime}.");

                if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
                    status.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
                    status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
                {
                    break;
                }
            }

            log.LogInformation($"Output: {status.Output}");
        }
        [FunctionName("ProcessCommentaryOrchestration")]
        public static async Task ProcessCommentaryOrchestration(
        [OrchestrationTrigger] IDurableOrchestrationContext context,
        ILogger log)
        {
            List<MatchCommentary> commentaryList = GetCommentaryList();
            
            foreach(var commentary in commentaryList) 
            {                
                await context.CallActivityAsync("PublishToServiceBusQueue", commentary);                
                await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(20), CancellationToken.None);               
            }
        }

        [FunctionName("PublishToServiceBusQueue")]
        public static async Task PublishToServiceBusQueue(
           [ActivityTrigger] MatchCommentary commentary,
           [ServiceBus("%ServiceBusQueue%", Connection = "ServiceBusConnection")] IAsyncCollector<MatchCommentary> messageQueue,
           ILogger log)
        {
            await messageQueue.AddAsync(commentary);
            log.LogInformation($"Published to Queue: {commentary}");
        }

        private static List<MatchCommentary> GetCommentaryList()
        {
            return new List<MatchCommentary>
        {
            new MatchCommentary{
            matchId ="1",
            commentary= "Boult to Jadeja, out Caught by Williamson!! Massive wicket! NZ needed a wicket and his main strike bowler has done precisely that. Slower ball and for once, Jadeja didn't quite pick it as he went across the line. A bit too early and skewed it straight up in the air. Never easy, these pressure catches but it's the ice-cool KW at mid-off who settles under it calmly. Exceptional knock from Jadeja but he has to go. NZ in the driver's seat. Jadeja c Williamson b Boult 77(59) [4s-4 6s-4]"
            },
            new MatchCommentary{
                matchId ="1",
                commentary="Ferguson to Chahal, no run, very full and outside off, squeezed out towards mid-off"
            },
            new MatchCommentary{
                matchId="1",
                commentary="Ferguson to Chahal, 1 run, full and angling into Chahal who flicks it away towards deep square leg"    
            }
        };
        }

        public class MatchCommentary
        {
            public string commentary { get; set; }
            public string matchId { get; set; }
        }
    }

}
  • Create a Service bus namespace and a queue under it
  • Copy the keys and use it in the above commentary service to establish the connection
  • create virtual machine scale sets for both video and recommendation service
  • Run the Commentary service and initiate the process of a simulated hotstar environment
  • Nikhil Sehgal - despite being the CEO and founder of a company, took the time to address the questions and uncertainties I had.

  • I have encountered few inaccurate results in the logic app runs, with some outputs not aligning with our expectations. Therefore, in the upcoming blog, we will explore the process of fine-tuning these models to provide more precise and reliable results.

  • Following that, let’s explore how to pre-warm these instances under specific scenarios. For instance, we can monitor the recent commentary history to assess the current batsman’s performance. If there’s a high probability that the current batsman will be dismissed soon, and the next batsman is a key player, we can strategically pre-warm the instances in advance.

  • Additionally, we can introduce other models like Llama2 and evaluate their results and performance in comparison to GPT models.

If you have any other topic to cover as part of this series. Please do let me know in the comments.