Store your Event streams in Azure Event Hub

Scenario :

Suppose you have a CRM system where you manage all your Customer relationships. There will be 1000 of users connected in this system. You need to have a activity feed panel which will be visible in user’s home page where he can see the activities happening in their company.

Ex:

  • New appointment is added with a Customer who I am working with
  • New Document has been added

So you need to capture all the events triggering in your CRM system , process them and store to show in user’s activity feed pane.


Problem to Solve:

There will be thousands of events happening at a second so we need a proper messaging service to store the events which will auto scale with the events load and which enables batch processing and parallel processing.


Solution:

There are many storage options in azure which could solve the problem we have on hand. Picking what suits best is the tricky part. Lets compare few options available a pick the best.

1. Azure Service Bus

Azure Service Bus mainly serving for enterprise application for passing messages for transaction processing ,ordering and workflows. It provides reliable asynchronous message delivery (enterprise messaging as a service) that requires polling

2. Azure Event Hub

Azure Event hub is the bid data pipeline. It facilitates the capture, retention, and replay of telemetry and event stream data. Azure Application Insights also using Azure Events Hubs as the message service for its telemetry data.


So we found the solution. Its Azure Event Hubs which meets the needs of the solution which we need to provide.


Simple Architecture to solve this flow.

image



Lets see step by step how to add sample data to event hub using a sample function and then process it and store it in Cosmos DB.

Step 1:  Go To Azure Portal and create a event hub

  • Go to azure Portal
  • Go to create new resource
  • Search Event Hubs and create a event hub

image

Step 2: Create a Azure Function App

  • Go to azure Portal
  • Go to create new resource
  • Search Functions and create a event hub

image

Step 3: Create a Time Trigger function to push some data in to Event Hub

  • Go to Visual Studio and Create a Timer Trigger Function

image

  • Add Microsoft.Azure.EventHubs nuget package to your project
  • Add the following code in to your function class. It will add events to the event hub each time it is being triggered.
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.ServiceBus;

namespace EventsTriggerApp
{
    public static class EventTrigger
    {
        private static EventHubClient eventHubClient;
        private const string EventHubConnectionString = "add event hub connection string here";
        private const string EventHubName = "activityhub";
        [FunctionName("EventTrigger")]
        public static void Run(
            [TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
            TraceWriter log
            )
        {
            MainAsync(log).GetAwaiter().GetResult();
        }


        private static async Task MainAsync(TraceWriter log)
        {
                 var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
            {
                EntityPath = EventHubName
            };

            eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

            await SendMessagesToEventHub(100,log);

            await eventHubClient.CloseAsync();

        }


        private static async Task SendMessagesToEventHub(int numMessagesToSend,TraceWriter log)
        {
            for (var i = 0; i < numMessagesToSend; i++)
            {
                try
                {
                    var message = $" New Document {i} Updated By User {i}";
                   log.Info ($"Sending message: {message}");
                    await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
                }

                await Task.Delay(10);
            }

            Console.WriteLine($"{numMessagesToSend} messages sent.");
        }
    }
}


  • Right click on the project and publish the function to the function App you created in portal

Step 4: Create another Azure Function App to process  the events

  • Create another Function App in the portal and name it as DataProcess App

Step 5: Create a Cosmos DB account with SQL API

    • Go to azure Portal
    • Go to create new resource
    • Search Cosmos DB and create a Cosmos DB account with SQL API

image

Step 6: Create a Event Hub function to process  the events

  • Go to Visual Studio and Create a EventHub Trigger Function

image

  • Add the following nuget packages to your project

                  Microsoft.Azure.WebJobs.Extensions.DocumentDB

  • Then add the following code in to your code


using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.ServiceBus;
using SuperOffice.Data;
using System;
using System.Threading.Tasks;

namespace DataProcessorApp
{
    public static class DataProcessor
    {
        [FunctionName("DataProcessor")]
        public static async Task Run(
            [EventHubTrigger("activityhub", Connection = "EventHub")]  string[] eventHubMessages, 
            TraceWriter log,
            [DocumentDB("FeedDataBase", "tenant2",
                        CreateIfNotExists =true,
                        ConnectionStringSetting = "CosmosDBConnection")]
                         IAsyncCollector<UserActivity> userActivities)
        { 
            log.Info($"C# Event Hub triggered with batch of messages: {eventHubMessages.Length}");
            foreach (var item in eventHubMessages)
            {
                log.Info($"C# Event Hub trigger function processed a message: {item}");

                await userActivities.AddAsync(new UserActivity
                {
                    User = new SuperOffice.Data.User { Id = "1" },
                    Activity = new Activity { Type = "1", CreatedOn = DateTime.Now, CreatedBy = new SuperOffice.Data.User { Id = "2", Name = "jens" }, Description = item }
                });
            }

        }
    }
}


In this code first see there are two bindings added in the main method.


1. Event Hub Trigger Binding


[EventHubTrigger("activityhub", Connection = "EventHub")]  string[] eventHubMessages,


So in this binding you have to give your event hub connection string in the local,settings.json file as follows.

image


Also set the event hub name in your event hub account which this function should trigger. Also you have to set the batch size in the hos.json file.


{
  "eventHub": {
    "maxBatchSize": 64,
    "batchCheckpointFrequency": 1
  }
}


What BatchSize configuration would do it when thousands of data coming in to the event hub for one instance function would get maximum of 64 events at once.

Once this binding is added function will trigger when new events being added to the event hub.


2. Cosmos DB output Binding

  [DocumentDB("FeedDataBase", "tenant2",
                        CreateIfNotExists =true,
                        ConnectionStringSetting = "CosmosDBConnection")]
                         IAsyncCollector<UserActivity> userActivities


So in this binding you have to set the database name, collection name & the connection string name. Cosmos DB connection string also you have to add in local.settings.json file.


so you can see we have set a IAsyncCollection, what this would do is in the function app all we have to do is the add the objects which need to get stored in to this collection. Serialize and storing them to the DB will be handled by the function app it self. Less code for developer.


Now All Good. Deploy the function apps. Then run the event trigger timer app, then you can see data being added to cosmos DB.


Hope This helps, I will describe bit details now batch processing & scaling would happen with event hub and functions apps in my later post.


Happy Coding with Azure !!!!

Comments

  1. Many many thanks for this awesome article. This post really helped me a lot. Waiting for your next article. Thanks a lot.

    ReplyDelete

Post a Comment

Popular posts from this blog

Responsive Web Design

Contract First Development in WCF 4.5

Affine Cipher in C#