Stream processing windowing using Azure Event Hubs

04 March 2019

This post is aimed at engineers designing systems that need to process streams of events. One particular solution is explored, using an Azure Event Hub and the Capture feature.

There are a few concepts that are helpful to understand before diving in.

  1. What is stream processing?
    Stream processing as defined by Martin Kleppman in his book Designing Data-Intensive Applications is

    somewhere between online and offline/batch processing (so it is sometimes called near-real-time or nearline processing). Like a batch processing system, a stream processor consumes inputs and produces outputs (rather than responding to requests). However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.

  2. What is stream windowing?
    Windowing, as used in this post, is the process of breaking down events from a continuous stream into groups that occurred during a given time interval (typically small, on the order of minutes or seconds). One iteration of the time interval is considered a “window”.

  3. What are Azure Event Hubs?
    Azure Event Hub is a managed service that provides a log-based message broker to send and receive messages. A log-based message broker like Event Hub maintains an append only storage of messages and consumers of the log maintain their position in the log as they process messages. Messages are only removed from the log when the log is compacted to remove duplicate messages or when the log begins to run out of space. Apache Kafka is another common implementation of a log-based message broker with similar semantics.

    This log message broker should not be confused with “standard” message brokers like RabbitMQ or Azure Service Bus which provide queueing semantics. Messages are pulled from a queue, then locked by the broker until the message is processed/deleted or returned to the queue. No central log is maintained.

Getting started with Capture

A common approach to dealing with an “infinite” event stream is to break it down into time-based windows and process each window as a batch. Any message broker can handle this concept but typically you have to implement it yourself. I like to offload logic like that to the platform, which we can do if we’re using Event Hubs. The Event Hub service provides a feature called Capture that will process messages in a configurable window and write them in a batch to either Azure Data Lake Store or Azure Blob Storage.

For the rest of this post I’m assuming that your Event Hub has been configured to output to Blob Storage using a 5 minute/300 megabyte window. When Capture is configured, all messages inside a given Event Hub partition and time window will be written to one blob file compressed using the Avro format for efficiency. The naming convention for the blob is {AzureNamespaceName}/{EventHubName}/{PartitionId}/{DateDownToTheSecond}.avro. If no messages are received in a window, the Capture feature will write an empty Avro file to blob by default. I recommend turning this off to make processing of the blobs easier.

Processing the windowed output

To handle the Capture output using a scalable platform is essential. For big data analysis, Azure Data Lake works well. However, it can be overkill for smaller datasets and lower throughput services. An option for the low-scale case is Azure Functions with a blob trigger like the one below:

[FunctionName("CaptureBlobTrigger")]        
public static async Task RunAsync([BlobTrigger("myblobcontainername/{name}.avro")] Stream capturedWindowStream, string name, ILogger log)
{
    await ProcessWindowAsync( capturedWindowStream, log );
}

The function will run every time a new blob appears in the blob container myblobcontainername. The ProcessWindowAsync method can then deserialize the input blob and process it. Microsoft provides the Microsoft.Avro.Core nuget package for working with Avro files as shown below. The code deserializes the input and logs each unique message body contained in the window:

private async Task ProcessWindowAsync( Stream capturedWindowStream, ILogger log )
{
  // based on https://gist.github.com/pshrosbree/74c8c4b4744c00cf3d92939952808d1e
  using ( IAvroReader<object> reader = AvroContainer.CreateGenericReader( stream ) )
  {
    while ( reader.MoveNext() )
    {
      foreach ( string recordBody in reader.Current
                    .Objects.Select( o => new AvroEventData( o ) )
                    // assumes UTF8 encoding of input message string
                    .GroupBy( r => Encoding.UTF8.GetString( r.Body ) )
                    .Select( g => g.Key ) )
      {
          log.WriteInformation( $"{DateTime.Now} > Read Unique Item: {recordBody}" );
      }
    }
  }
}

private struct AvroEventData
{
  public AvroEventData( dynamic record )
  {
    SequenceNumber = (long) record.SequenceNumber;
    Offset = (string) record.Offset;
    DateTime.TryParse( (string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc );
    EnqueuedTimeUtc = enqueuedTimeUtc;
    SystemProperties = (Dictionary<string, object>) record.SystemProperties;
    Properties = (Dictionary<string, object>) record.Properties;
    Body = (byte[]) record.Body;
  }
  public long SequenceNumber { get; set; }
  public string Offset { get; set; }
  public DateTime EnqueuedTimeUtc { get; set; }
  public Dictionary<string, object> SystemProperties { get; set; }
  public Dictionary<string, object> Properties { get; set; }
  public byte[] Body { get; set; }
}

For a production system, instead of logging, the messages in the window could be processed and analyzed. I hope this post helped you learn about Event Hub Capture and how it might be useful for processing event streams.

404s and web.config errors using git in WSL

18 January 2019

I’ve run into two problems with local development recently and I wanted to share the resolution to the problems in case it helps others.

IIS Express 404s

The first problem was after a fresh clone using git in a bash prompt under Windows Subsystem Linux (WSL). When I started the full framework, asp.net application, the response was always a 404. IIS Express could not see content or application code in that directory no matter what I changed. The fix was to either re-clone using git in a regular windows command prompt, or copy all the files to another folder manually created in windows explorer. Something about the permissions of the parent folder don’t work well with IIS Express when created through the WSL git executable.

Web.config not recognized

The second problem was with the web.config file not being recognized. I again had cloned the repository using git in a bash prompt under WSL. The file existed on disk but IIS Express was not reading it and applying the configuration. The fix for this problem was to rename the file in WSL bash to match the casing in the .csproj file.

For example, on disk the file was cloned as Web.config but the .csproj entry was:

    <Content Include="web.config">
      <SubType>Designer</SubType>
    </Content>

Note the mismatch in the first character casing. Once I renamed the file to be all lowercased as the .csproj file expected, everything worked. I believe this issue has something to do with case sensitivity differences between WSL and Windows.

High thread count in Azure Functions

09 October 2018

Recently Azure Functions runtime V2 was released and one of my team’s functions started to consume an abnormally high number of threads. When we dug into the details of the problem, the root cause was performing static initialization of logging on each function invocation. Specifically, we were using Serilog with the Application Insights sink and were recreating the sink each time the function ran. The sink was then assigned to a static variable (Log.Logger).

The code used to look something like this:

using System;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.WebJobs;
using Serilog;

namespace ServiceBusThreadingProblem
{
   public static class HighThreadCountFunction
   {
      [FunctionName( "HighThreadCountFunction" )]
      public static void Run( [ServiceBusTrigger( "TestTopicName", "TestSubscriptionName", Connection = "TestSubscriptionConnectionString" )] string messageContents, ExecutionContext context, int deliveryCount, DateTime enqueuedTimeUtc, string messageId )
      {
         var logConfiguration = new LoggerConfiguration()
            .Enrich.WithProperty( "FunctionInvocationId", context.InvocationId )
            .Enrich.WithProperty( "QueueMessageMessageId", messageId )
            .Enrich.WithProperty( "MachineName", Environment.MachineName );

         logConfiguration.WriteTo.ApplicationInsightsTraces( new TelemetryConfiguration( Environment.GetEnvironmentVariable( "APPINSIGHTS_INSTRUMENTATIONKEY" ) ) );

         Log.Logger = logConfiguration.CreateLogger();
         Log.Information( $"C# ServiceBus topic trigger function processed message: {messageContents}" );
      }
   }
}

The issue is with the re-assignment of the Log.Logger static variable each time the function runs. Looking at a memory dump it was apparent that each thread (~400 of them) was waiting somewhere in Application Insights code. That led us to examine the application insights traces line, which is part of the Serilog initialization. A quick test confirmed that without that line, the threads stayed at normal levels.

As a general principle in Azure Functions, if a class can manage external connections and is threadsafe then it should be reused as a static variable. See the Azure Function documentation on static client reuse. Since Serilog’s Log.Logger is a static variable and is threadsafe, the code above should not have been recreating the logger on each invocation.

Instead, the code above should be refactored to reuse the Log.Logger variable. One pattern that makes this easy is using the Lazy class to ensure only one instance of the logger is ever created as shown below.

using System;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.WebJobs;
using Serilog;
using Serilog.Context;
 
namespace ServiceBusThreadingProblem
{
   public static class NormalThreadCountFunction
   {
      private static Lazy<ILogger> LoggingInitializer = new Lazy<ILogger>( () =>
      {
         var logConfiguration = new LoggerConfiguration()
                 .Enrich.FromLogContext()
                 .Enrich.WithProperty( "MachineName", Environment.MachineName );

         logConfiguration.WriteTo.ApplicationInsightsTraces( new TelemetryConfiguration( Environment.GetEnvironmentVariable( "APPINSIGHTS_INSTRUMENTATIONKEY" ) ) );
         Log.Logger = logConfiguration.CreateLogger();
         return Log.Logger;
      } );

      [FunctionName( "NormalThreadCountFunction" )]
      public static void Run( [ServiceBusTrigger( "TestTopicName", "TestSubscriptionName", Connection = "TestSubscriptionConnectionString" )] string messageContents, ExecutionContext context, int deliveryCount, DateTime enqueuedTimeUtc, string messageId )
      {
         ILogger logger = LoggingInitializer.Value;

         using ( LogContext.PushProperty( "FunctionInvocationId", context.InvocationId ) )
         using ( LogContext.PushProperty( "QueueMessageMessageId", messageId ) )
         {
            logger.Information( $"C# ServiceBus topic trigger function processed message: {messageContents}" );
         }
      }
   }
}

In the code above, the Application Insights Serilog sink is no longer reinitialized on each function invocation. The Lazy field handles initializing the logger if one isn’t already available. With this refactoring, the function consistently uses only ~40 threads. Additionally, the thread count is not impacted by additional load.

Takeaways

To help your azure function scale and to avoid consuming extra resources, reuse as much as possible between invocations.

  • Use static clients to optimize connections. Examples of clients that should be reused are Http Client, Azure Storage clients, and logging clients.
  • Avoid reinitializing classes that can be reused. In my case, it was logging with Serilog but configuration is another candidate to be initialized once for all invocations of a function.

Debugging Azure Functions with Windbg

05 October 2018

When all else fails troubleshooting a production issue I often reach for Windbg to dig into a problem. For troubleshooting issues like high CPU or memory usage, Windbg is a great tool to pinpoint the problem. Windbg allows you to open a memory dump of a running application and inspect it. Anywhere you can run .NET code should support taking a memory dump, which you can then inspect using Windbg. I recently had to troubleshoot an issue with a .net core Azure Function application and while the steps were a little different than a web application, it is possible. Note - this post assumes you have Windbg installed. If not follow the link above and install it. I recommend the version you can get through the Windows store.

Take memory dump of running function

First, open the KUDU site for your problematic azure function that is running in Azure. The URL will look like https://<your-function-name-here>.scm.azurewebsites.net. Navigate to the Process Explorer tab and copy the PID for the process running your function. This might look weird because the process is running W3WP.exe but that is expected. There should only be two options and one will be tagged with the scm tag, meaning that is the process running the scm site. Choose the W3WP.exe process not tagged scm and copy the PID.

Next, open the Debug Console tab. I usually choose the CMD option but either should work. Navigate to the logfiles directory and make a new directory to hold the dump files. I typically use dumpfiles as the name. Navigate into the new directory and run the following command to generate the dump file - D:\devtools\sysinternals\procdump -accepteula -ma <your-PID-here>. That will generate the dump file and place it in the current directory. Finally, download the file with the .dmp extension that was generated to your local machine.

Determine platform details and load into debugging session

Open Windbg and load the .dmp into your debugging session. First, we have to determine if the application was running as x86 or x64 (this will be important later on). You should be able to find it in the output when the file was loaded. See the image below for an example (the example was running as x86).

FindBitnessInWindbg

Now we are ready to inspect the memory dump. The next step is to load the appropriate SOS.dll (debug extensions for .net) for the .net runtime your application is running under. The easiest way to ensure you have the right version is to download it from the KUDU site. To find where it’s located run the following command in Windbg - .loadby sos clr for full framework applications or .loadby sos coreclr for .net core applications. That command attempts to load the sos.dll from the same location that clr.dll/coreclr.dll was located on the machine running the application. Since we aren’t on the KUDU site the command fails but it will output the path to the dll in the error as shown below.

SOSLoadError

Take the path from the error message and navigate to that location in the KUDU site for your application. Note that you have to press the System Drive button in the Debug Console to navigate to the root of the D:\ drive. Then you can move to the path specified in the error message. In my case it was D:\Program Files (x86)\dotnet\shared\Microsoft.NETCore.App\2.1.4. Once you are there, find the SOS.dll in that directory and download it. Finally, load that dll via the .load command - .load C:\the\downloaded\file\path\SOS.dll.

Inspect the memory dump

At this point we have a memory dump of the application and the debug extension for .net is loaded. Providing concrete next steps is tough because it depends on your problem. A good place to start is by running !sos.threadpool to output statistics about the threads in the application and the CPU percentage. !sos.threads will show the details of each thread in the application.

Another useful debug extension to consider using is MEX. Go to the MEX page and download the appropriate dll for your application. This is where the x86/x64 inspection we did earlier is useful. Once you have the right version downloaded, load it via .load C:\path\to\MEX.dll. You can see all the .net related commands by running !mex.help -cat 'DotNet' but I often find !mex.sqlcn useful for looking at open sql connections and !mex.us is helpful to automatically look at all stack traces of running threads and group similar stacks together.

Other resources

Windbg commands are cryptic and difficult to remember so I have the best luck googling for my problem with “windbg” in the query to find blog posts that talk about how to debug my problem. Some especially useful articles I find myself going back to again and again are below. Good luck!

Other useful Windbg posts

Full Framework WSFederation to OWIN Conversion

14 May 2018

If you have been using WSFederation in a .net web application for more than a year or two, chances are that it is configured using the Microsoft.IdentityModel.Web or System.IdentityModel.Services libraries. Two HTTP modules are added to the application, WSFederationAuthenticationModule and SessionAuthenticationModule, to handle the WSFederation protocol and configuration was done by inheriting those classes or configuring on application start via the web.config. However, in newer versions of asp.net using “middleware” is preferred by using OWIN in both full framework applications and .net core. The purpose of OWIN is to abstract the underlying web server from the web application. HttpModules are tightly coupled to System.Web and therefore the IIS webserver. Using OWIN does require some configuration and setup changes which I will detail in this post.

Basic OWIN setup

First, if you don’t already have OWIN configured for your application install the Microsoft.Owin and Microsoft.Owin.Host.SystemWeb nuget packages. Then add a startup class like the one below to your application:

using System;
using System.Threading.Tasks;
using Microsoft.Owin;
using Owin;

[assembly: OwinStartup(typeof(OwinApp.Startup))]
namespace OwinApp
{
    public class Startup
    {
        public void Configuration(IAppBuilder app)
        {
        }
    }
}

Convert SessionAuthenticationModule into OWIN configuration

Once OWIN is installed, we can begin configuring WSFederation. Previously a SessionAuthenticationModule would have been customized to set up properties for the cookies that will store session information:

public class CustomSessionAuthenticationModule : SessionAuthenticationModule
{
  protected override void InitializePropertiesFromConfiguration()
  {
      CookieHandler.RequireSsl = true;
      CookieHandler.Name = "FederatedAuthCookie";
  }
}

and configured as a HTTP module in the web.config:

<modules>
  <add name="SessionAuthenticationModule" type="MyApp.CustomSessionAuthenticationModule, MyApp" preCondition="managedHandler" />
</modules>

In the OWIN pipeline, we’ll configure the cookie using CookieAuthentication classes and helper methods.

public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        app.UseCookieAuthentication( new CookieAuthenticationOptions
        {
          // converted from the CookieHandler.Name = "FederatedAuthCookie"; line in SessionAuthenticationModule
          CookieName = "FederatedAuthCookie",
          // converted from the CookieHandler.RequireSsl = true; line in SessionAuthenticationModule
          CookieSecure = CookieSecureOption.Always
        } );
    }
}

Convert WSFederationAuthenticationModule into OWIN configuration

Next, we’ll convert our custom WSFederationAuthenticationModule to use the WsFederationAuthenticationMiddleware from the OWIN pipeline.

public class CustomWsFederationAuthenticationModule : WSFederationAuthenticationModule
{
  protected override void InitializeModule( HttpApplication context )
  {
      base.InitializeModule( context );

      RedirectingToIdentityProvider += OnRedirectingToIdentityProvider;
  }

  protected override void InitializePropertiesFromConfiguration()
  {
      Issuer = InstanceWideSettings.BaseStsUrl;
  }

  private void OnRedirectingToIdentityProvider( object sender, RedirectingToIdentityProviderEventArgs args )
  {
      // setting the realm in the OnRedirecting event allows it to be dynamic for multi-tenant applications
      args.SignInRequestMessage.Realm = Settings.BaseUrl;
  }
}

The code above will be removed and replaced with the UseWSFederationAuthentication helper below

public void Configuration(IAppBuilder app)
{
   ...
   app.UseWsFederationAuthentication( new WsFederationAuthenticationOptions
   {
     // Pulls in STS Url and other metadata (like signing certificates)
     MetadataAddress = Settings.StsMetadataUrl,
     Notifications = new WsFederationAuthenticationNotifications
     {
         // replaces the OnRedirectingToIdentityProvider event
         RedirectToIdentityProvider = notification =>
         {
           notification.ProtocolMessage.Wtrealm = Settings.PresentationUrlRoot;
         }
     };
     // Name this authentication type (for WIF)
     AuthenticationType = WsFederationAuthenticationDefaults.AuthenticationType,
     // Tells the pipeline to use a cookie authenication we configured above to store the WIF session
     SignInAsAuthenticationType = CookieAuthenticationDefaults.AuthenticationType
   } );
}

Move Global.asax.cs WSFederation configuration into OWIN configuration

Now that we’ve converted the two WSFederation HttpModules we can finish configuring the OWIN pipeline by converting either the WSFederation configuration in the web.config or that was configured on application start. In my case, I preferred to set up WSFederation in code using the FederationConfigurationCreated event like the code below:

FederatedAuthentication.FederationConfigurationCreated += ( sender, args ) =>
{
  args.FederationConfiguration.IdentityConfiguration.AudienceRestriction.AudienceMode = ystem.IdentityModel.Selectors.AudienceUriMode.Always;

  // this method loads the list of relying parties for a multi-tenant application.
  List<string> relyingParties = GetRelyingParties();
  relyingParties.ForEach( rp => args.FederationConfiguration.IdentityConfiguration.AudienceRestriction.AllowedAudienceUris.Add( new Uri( rp  ) );

  // This code loads the metadata url, parses it and and updates the configuration with details from it like the signing certificates
  args.FederationConfiguration.IdentityConfiguration.IssuerNameRegistry = new CustomMetadataParser( Settings.StsMetadataUrl );
};

The items configured above can be added to the UseWsFederationAuthentication configuration:

public void Configuration(IAppBuilder app)
{
   ...
   app.UseWsFederationAuthentication( new WsFederationAuthenticationOptions
   {
     ...
     TokenValidationParameters = new TokenValidationParameters()
     {
         // this replaces the IdentityConfiguration.AudienceRestriction setup
         ValidAudiences = GetRelyingParties(),
         ValidateAudience = true
     },
     // Pulls in STS Url and other metadata (like signing certificates) so we don't have to do custom metadata parsing
     MetadataAddress = Settings.StsMetadataUrl,
     ...
   } );
}

Additionally, in the Global.asax.cs file if you wanted to have access to WSFederation events you could declare special methods on your HttpApplication class and those would be invoked while the WSFederation protocol was executing. Two examples that I’ve used are shown below:

void WSFederationAuthenticationModule_SessionSecurityTokenCreated( object sender, SessionSecurityTokenCreatedEventArgs e )
{
   // extend the expiration of the session cookie to make it last 1 year
   TimeSpan expiration = TimeSpan.FromYears( 1 );
   e.SessionToken = new SessionSecurityToken( e.SessionToken.ClaimsPrincipal, e.SessionToken.Context, now, now.Add( expiration ) ) { IsPersistent = true };

   e.WriteSessionCookie = true;
}

void WSFederationAuthenticationModule_RedirectingToIdentityProvider( object sender, RedirectingToIdentityProviderEventArgs e )
{
   // add client id parameter to outgoing wsfederation request
   e.SignInRequestMessage.Parameters.Add( "client_id", Settings.ClientId );
}

Again, these items can be replicated in the UseWsFederationAuthentication configuration:

public void Configuration(IAppBuilder app)
{
   ...
   app.UseWsFederationAuthentication( new WsFederationAuthenticationOptions
   {
     ...
     Notifications = new WsFederationAuthenticationNotifications
     {
         // replaces the WSFederationAuthenticationModule_RedirectingToIdentityProvider method
         RedirectToIdentityProvider = notification =>
         {
           notification.ProtocolMessage.Parameters.Add( "client_id", Settings.ClientId );
         },
         // replaces the WSFederationAuthenticationModule_SessionSecurityTokenCreated method
         SecurityTokenValidated = notification =>
         {
           var newAuthenticationProperties = new AuthenticationProperties( authenticationTicket.Properties.Dictionary );

           DateTime now = DateTime.UtcNow;
           TimeSpan expiration = TimeSpan.FromYears( 1 );

           newAuthenticationProperties.IssuedUtc = now;
           newAuthenticationProperties.ExpiresUtc = now.Add( expiration );
           authenticationProperties.IsPersistent = true;

           return new AuthenticationTicket( claimsIdentity, authenticationProperties );
         }
     };
     ...
   } );
}

Wrap up

At this point, all old WSFederation code is replaced and WSFederation actions are handled using the OWIN pipeline. One thing to note - we are not able to re-use existing sessions so existing user sessions will be invalidated by this code change. Once the user logs in again at the STS they’ll be issued a new cookie that will work with the OWIN pipeline cookie authentication code.


See All Posts