Avoid dual writes with sql server and change tracking - Part 2

20 December 2017

In my last post I suggested using the database stream writer pattern to avoid writing to multiple data stores outside of a transaction from your application. This post will detail the implementation. For my example the application is a c# application writing to SQL Server and tracking changes using the Change Tracking feature. The data model for this example is similar to Youtube. It contains users, channels and media. A user has one or more channels which in turn have one or more pieces of media associated with them. The full schema is contained in this gist.

At a high level, there are 3 pieces of the architecture to consider.

  1. The primary application which will write to the SQL database. It will not deal with writes to the downstream data stores and will largely be ignored by this post.
  2. The SQL Server database which will be configured to track changes to all necessary tables.
  3. The application to monitor the change tracking stream from the database and push updates to their datastore (cache, search, etc).

Database Implementation

First change tracking must be enabled on the SQL Server database and tables. The SQL below enables change tracking on all three tables configured to retain changes for seven days and enables automatic cleanup of expired changes.

ALTER DATABASE BlogPostExample
SET CHANGE_TRACKING = ON  
  (CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON)  

ALTER TABLE dbo.UserAccount
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON)

ALTER TABLE dbo.Channel
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON)

ALTER TABLE dbo.Media
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON)

One final table is required to track the current position in the change tracking stream our monitor has consumed. The following SQL will create the table and initialize it with the minimum change tracking version currently in the database:

CREATE TABLE [dbo].[CacheChangeTrackingHistory] (
   [CacheChangeTrackingHistoryId]               INT   IDENTITY (1, 1) NOT NULL,
   [TableName]                             NVARCHAR (512)   NOT NULL,
   [LastSynchronizationVersion]            BIGINT   NOT NULL,
);
ALTER TABLE [dbo].[CacheChangeTrackingHistory]
   ADD CONSTRAINT [PK_CacheChangeTrackingHistory] PRIMARY KEY CLUSTERED ([CacheChangeTrackingHistoryId] ASC);

-- Add default values for last sync version
INSERT INTO dbo.CacheChangeTrackingHistory( TableName, LastSynchronizationVersion )
VALUES ('dbo.UserAccount', CHANGE_TRACKING_MIN_VALID_VERSION(Object_ID('dbo.UserAccount')))
INSERT INTO dbo.CacheChangeTrackingHistory( TableName, LastSynchronizationVersion )
VALUES ('dbo.Channel', CHANGE_TRACKING_MIN_VALID_VERSION(Object_ID('dbo.Channel')))
INSERT INTO dbo.CacheChangeTrackingHistory( TableName, LastSynchronizationVersion )
VALUES ('dbo.Media', CHANGE_TRACKING_MIN_VALID_VERSION(Object_ID('dbo.Media')))

Change Monitor Implementation

With the database properly configured we can start on the application that will consume the change tracking stream from SQL Server. The changes can be accessed by the CHANGETABLE( CHANGES <TABLE_NAME> ) function. I will focus on UserAccount changes but the code will apply equally to the Channel and Media tables. When our monitor application starts, a loop is started to process change tracking updates and push them to downstream data stores. In this case the only downstream data store is the Cache represented by the ICache interface. If we had multiple downstream systems to update, the application would start one monitoring loop with a distinct change tracking history table for each system.

public static class ChangeTracker
{
    internal static async Task StartChangeTrackingMonitorLoopAsync( CancellationToken token, ICache userAccountCache )
    {
       while ( true )
       {
          if ( token.IsCancellationRequested )
          {
             break;
          }

          using ( ChangeTrackingBatch<UserAccountChangeModel> userAccountChangesBatch = GetLatestUserChanges() )
          {
             UserAccountChangeModel[] userAccountChanges = ( await userAccountChangesBatch.GetItemsAsync() ).ToArray();
              foreach( var userAccount in userAccountChanges )
              {
                userAccountCache.UpdateObject( "user_account", userAccount.OperationType, userAccount.UserAccountId, userAccount );
              }
              userAccountChangesBatch.Commit();
          }

          await Task.Delay( 1000 );
       }
    }

    private Task<IEnumerable<UserAccountChangeModel>> GetLatestUserChangesAsync()
    {
         string cmd = "
DECLARE @last_synchronization_version BIGINT = (SELECT LastSynchronizationVersion FROM dbo.CacheChangeTrackingHistory WHERE TableName = 'dbo.UserAccount')

DECLARE @current_synchronization_version BIGINT = CHANGE_TRACKING_CURRENT_VERSION();
SELECT ct.UserAccountId, ua.Email, ua.DisplayName, ua.CreateDate
		, CASE WHEN ct.SYS_CHANGE_OPERATION = 'I' THEN 'Insert' WHEN ct.SYS_CHANGE_OPERATION = 'U' THEN 'Update' ELSE 'Delete' END AS OperationType
FROM dbo.UserAccount AS ua
	RIGHT OUTER JOIN CHANGETABLE(CHANGES dbo.UserAccount, @last_synchronization_version) AS ct ON ua.UserAccountId = ct.UserAccountId

UPDATE dbo.CacheChangeTrackingHistory
SET LastSynchronizationVersion = @current_synchronization_version
WHERE TableName = 'dbo.UserAccount'
";
         return new ChangeTrackingBatch<UserAccountChangeModel>( _connectionString, cmd );
    }
}
internal class ChangeTrackingBatch<T> : IDisposable
{
  private readonly string _command;
  private SqlTransaction _transaction;
  private IEnumerable<T> _items;
  private SqlConnection _connection;
  private readonly object _param;

  public ChangeTrackingBatch( string connectionString, string command, object param = null )
  {
     _connection = new SqlConnection( connectionString );
     _command = command;
     _param = param;
  }

  public async Task<IEnumerable<T>> GetItemsAsync( )
  {
     if ( _items != null )
     {
        return _items;
     }

     _connection.Open();
     _transaction = _connection.BeginTransaction( System.Data.IsolationLevel.Snapshot );
     _items = await _connection.QueryAsync<T>( _command, _param, _transaction );
     return _items;
  }

  public void Commit()
  {
     _transaction?.Commit();
     _connection?.Close();
  }

  public void Dispose()
  {
     Dispose( true );
     GC.SuppressFinalize( this );
  }

  protected virtual void Dispose( bool disposing )
  {
     if ( disposing )
     {
        _transaction?.Dispose();

        _connection?.Dispose();
     }
  }
}

One additional thing to note is the use of SnapshotIsolationMode for the database transaction. This makes sure we’re working with a consistent view of the database to prevent any collisions with the change tracking cleanup process.

Wrap Up

At this point the solution is complete. Any updates to the UserAccount table will be tracked and pushed into the cache by the change tracker class. If the update to the cache fails, the transaction will be rolled back. The monitor application will then retry applying changes in order until the change is pushed into the cache successfully or the change is cleaned up by change tracking retention settings.

This solution is tied to the scalability of SQL Server, so for a write heavy application a different architecture may be necessary. For example, SQL Server could be replaced by a log stream like Kafka. However, for moderate scale applications this architecture will be more than adequate to handle load. We’ve solved the resiliency and race condition issues from the dual write scenario by ensuring that any successful database write will be pushed to downstream systems, in order. We’ve also improved the overall architecture of the primary application by removing the writes to secondary data stores. Plus we haven’t introduced any new data stores to learn and manage. For reference, the full application source code is available on GitHub.

Avoid dual writes with sql server and change tracking - Part 1

13 December 2017

Problem: Consistent updates to multiple data stores

A common problem in web applications is the need to persist updates to multiple data stores (sql, cache, search, etc). Rarely does an application deal only with one data store. How do we get one update from the application into all data stores? The most common approach is dual writes where the application simply writes to each data store in parallel or serial order. This is compelling because it’s easy to implement and works well with low traffic, low error scenarios.

Dual_write-example

However, there are many tricky errors that can arise. The most common being a failure of one of the writes. One data store has the new data and one has stale data. Another problem is race conditions among the different data store updates like in the diagram below. In this example, the value will be ‘2’ in the SQL database and ‘3’ in the Redis cache. No errors were thrown in this case making it even harder to track down.

Dual_write-example

How do we deal with these problems? One approach is to build a process to monitor the databases, look for drift between the two, then correct the one that’s out of line. However, this is difficult because you have to infer which database is correct. It’s also slow to analyze the whole database so the difference between the two databases may be “in the wild” for some time.

A better solution is the unified log pattern. At a high level, the unified log pattern is implemented by persisting all writes to one stream data store like Kafka or Azure Event Hubs. The log is consumed by one or more applications that persist the updates to dependent data stores as shown below.

Unified-Log-Example

This avoids many of the problems from the dual write scenario but it is difficult to introduce into existing systems that currently write to more traditional data stores like SQL. Additionally, if your team is used to working with traditional databases, a log data store can be a difficult mindset shift.

What else can we do? A better solution is to only write to the SQL database. Then consume the changelog of the database and update any dependent data stores. I call this approach database stream writer. This idea comes from a series of blog posts Martin Kleppmann did on Events and Stream Processing. With this system you can continue writing to the existing SQL database but your primary application no longer has to deal with updating dependent data stores.

Database-leader-example

How do we implement the database stream writer pattern?

This pattern is implemented by turning Change Data Capture on for your database. Change Data Capture exposes the changes made to a database to a third party application based on its commit log. Many database vendors support this feature, for example, postgresql exposes a method called logical decoding that can be used to parse updates from the write ahead log. SQL Server calls this feature change data capture where updates to tables are stored in system tables inside the same database and exposed via table functions. SQL Server also has a lighter weight feature called change tracking that does not track individual column updates but simply tracks when a table row is modified. For many applications just knowing that a row changed is enough information to make necessary updates to dependent data stores (like invalidating a cache).

With a change data capture system in place, the last step is to write an application to consume the change data capture stream and write the data to downstream systems. In my next post, I will detail how to implement a system like this using SQL Server’s Change Tracking and a C# application for consuming the change stream.

Solutions For Async/Await In MVC Action Filters

15 August 2017

Async/await has been available in .net for years but until the release of asp.net core there was no way to create a MVC ActionFilter that uses async/await properly. Since async was not supported by the framework there was no truly safe way to call async code from an ActionFilter. This has changed in asp.net core but if you are using ASP.Net 5 or below you’re stuck.

Recently, I found a workaround to using an async HttpModule to load whatever data the ActionFilter will need. You could also do all the work of the ActionFilter in the HttpModule but I prefer to keep the filter because it ties more closely into the rest of the MVC pipeline. My example will demonstrate moving async code out of an AuthorizationFilter but the pattern will work with any ActionFilter.

ActionFilter to fix

This is an example authorization filter that does async work as part of the authorization of the request. Because attributes do not have async methods to override we’re stuck calling .Wait() and .Result to synchronously execute the task. This code is ripe for deadlocks.

public class WebAuthorizationFilter : AuthorizeAttribute
{
  public override void OnAuthorization( AuthorizationContext filterContext )
  {
     if ( AllowAnonymous( filterContext ) )
     {
        return;
     }

     Task<bool> isAuthorizedTask = DoAsyncAuthorizationWork( filterContext.HttpContext );
     isAuthorizedTask.Wait();

     bool isAuthorized = isAuthorizedTask.Result;
     if ( !isAuthorized)
     {
        filterContext.Result = new UnauthorizedResult();
     }
  }
}

There are 2 classes necessary to add the module:

New HttpModule to handle async code

Any async code goes in this class. Call necessary methods then add state to HttpContext.Items.

public class WebAuthorizationAsyncModule : IHttpModule
{
    public void Init( HttpApplication context )
    {
       var authWrapper = new EventHandlerTaskAsyncHelper( AuthorizeRequestAsync );

       // Execute module early in pipeline during request authorization
       // To execute the module after the MVC route has been bound, use `context.AddOnPostAcquireRequestStateAsync` instead
       context.AddOnAuthorizeRequestAsync( authWrapper.BeginEventHandler, authWrapper.EndEventHandler );
    }

    private static async Task AuthorizeRequestAsync( object sender, EventArgs e )
    {
       HttpApplication httpApplication = (HttpApplication) sender;
       HttpContext context = httpApplication.Context;

       bool isAuthorized = await DoAsyncAuthorizationWork( context );

       // Store the result in HttpContext.Items for later access
       context.Items.Add( "IsAuthorized", isAuthorized );
    }
}

Module Registration Startup Class

This class registers the HttpModule created above with asp.net. You can also register in the web.config but I prefer to keep this kind of configuration in code.

public class PreApplicationStartCode
{
  public static void Start()
  {
    DynamicModuleUtility.RegisterModule( typeof( WebAuthorizationAsyncModule ) );
  }
}

The second code change required is to add the PreApplicationStartCode class to the startup classes registered with asp.net. To do this use the PreApplicationStartMethod attribute on your HttpApplication class in Global.asax.cs.

[assembly: PreApplicationStartMethod( typeof( Some.Code.PreApplicationStartCode ), "Start" )]
namespace Some.Code
 {
    public class WebApplication : HttpApplication
    {
      ...

Action Filter changes

This is the same authorization filter from above changed to read the authorization result from Httpcontext.Items instead of doing the work directly.

public class WebAuthorizationFilter : AuthorizeAttribute
{
  public override void OnAuthorization( AuthorizationContext filterContext )
  {
     if ( AllowAnonymous( filterContext ) )
     {
        return;
     }

     bool isAuthorized =  (bool) filterContext.HttpContext.Items["isAuthorized"];
     if ( !isAuthorized)
     {
        filterContext.Result = new UnauthorizedResult();
     }
  }
}

Next Steps

This example was deliberately simple and as such it executes for every request. To only execute the module for specific URLs you can inspect the HttpContext.Request.Url property. Or if you delay execution of the module until after the ‘Acquire State’ pipeline step in asp.net (context.AddOnPostAcquireRequestStateAsync in Module.Init) and access the MVC route values using HttpContext.Request.RequestContext.RouteData.Values you can only execute the module code for specific Controllers/Actions in MVC.

Effective logging in .Net

27 July 2017

Logging is one of those things that everyone says you should be doing but it’s hard to get it right. Good logging can be the difference between finding and fixing a bug in a few hours or that same fix taking days and additional releases to isolate a problem. In this post I’ll lay out my thoughts on logging and how to do it well. Hat tip to Nicholas Blumhardt for introducing many of these ideas to me.

Where are we?

The first question to ask is “what kind thing am I building?”. If the answer is a .net application or website then you can skip this section. If you’re building a reusable library then you should consider an abstraction to let the consumer of your library decide how to log. The .net ecosystem has many options for logging abstraction but most require you to include a reference to the library in your dependency tree. For a cross cutting concern like logging you may run into libraries that use different logging abstractions or potentially conflicting versions of the same abstraction. For this reason I don’t like solutions that require a dependency on an external library. I also would prefer to not reinvent the wheel and create a new logging abstraction in each project I work on.

Enter LibLog, “a single file for you to either copy/paste or install via nuget, into your library/framework/application to provide a logging abstraction.” LibLog is just a source file that is included in your library that adds an internal ILog abstraction. Consumers of your library do not have to care about it because it also includes transparent support for common logging providers. Using reflection LibLog finds the logging framework your application is using and logs to it, without the consumer having to write any code.

One last piece of advice when adding logging a library, make sure you keep the log levels low. It’s unlikely you need to log above a Debug level. Instead of logging with Warning or Error levels, communicate the problem in an obvious way using an exception or error code. Even Information logs should be kept to a minimum.

Application Logging

The more common scenario for developers is the need to log inside of an application like a desktop app or website. For that I prefer structured logging as opposed to text logging. Structured logs rather than simply being a string of text are a set of key-value properties with a timestamp. Where you might have a text log of "12-17-2016 - Logging in as user with id '1234' failed.", a structured log would look like '{ timestamp: '12-17-2016', UserId: 1234, action: 'log_in' status: 'failed'}'. The structured log conveys the same information (and can even be rendered as a string) but has the advantage of being queryable if stored in a log server that supports structured logs. Rather than searching for a string, you can quickly find all log_in actions with a status of failed using a query. Application Insights and Seq are two examples of logging servers that support structured logs.

Serilog is my preferred library for writing structured logs. Serilog pioneered an easy way to write structured logs in .net. Rather than providing an object, you provide a message template similar to a string.Format template that looks something like Log.Information("{Action} finished with {Status} for {UserId}", Actions.LogIn, Statuses.Failed, userId);. Serilog can then convert that log into a string like "action 'log_in' finished with status 'failed' for user id '1234'" or if your logging server supports it, a log object like

{
    "time": "2016-12-17",
    "level": "Information",
    "template": "{Action} finished with {Status} for {UserId}",
    "properties": {
        "action": "log_in",
        "status": "failed",
        "userId": "1234"
    }
}

Structured logging using message templates are also supported directly in .net core using the Microsoft.Extensions.Logging library. More information on structured logging can be found in Nicholas Blumhardt’s blog series.

Correlation

When trying to track down an issue with a production application, it’s hard to tell which logs go with a given request unless you tie them together. This is commonly called a CorrelationId. Serilog supports adding it to all messages via LogContext -

using (LogContext.PushProperty("CorrelationId", Request.Id))
{
  // All logs inside the context will have the CorrelationId added to them
}

Microsoft.Extensions.Logging also supports adding properties to all messages through ILogger.BeginScope(). ASP.Net core will even add this by default. You can even use a library like CorrelationId to ensure a CorrelationId is passed among services if you are running in a microservice environment where you want to track a request across services. In addition to tracking CorrelationId, things like Environment and MachineName can be helpful when troubleshooting.

Keep it DRY

Rather than sprinkling Log.Information(...) calls throughout your codebase I suggest creating a simple, generic class to encapsulate the logger (Credit to Erik Heemskerk for the idea) -

public class ApplicationMonitoring
{
  // from serilog
  ILogger Logger { get; }
  ILogContext LogContext { get; }
}

This class is deliberately kept simple so that it can be shared widely throughout your application’s code base. Then extend the class with whatever logging needs you might have using extension methods on the ApplicationMonitoring class -

public static class UserActionsApplicationMonitoring
{
   public static void UserLogInFailed( this ApplicationMonitoring monitoring, int userId )
   {
       monitoring.Logger.Information("{Action} finished with {Status} for {UserId}", Actions.LogIn, Statuses.Failed, userId);
   }

   public static void UserLogInSucceeded( this ApplicationMonitoring monitoring, int userId )
   {
       monitoring.Logger.Debug("{Action} finished with {Status} for {UserId}", Actions.LogIn, Statuses.Success, userId);
   }
}

This way the details of how you log are separate from your business logic. Additionally, if you want to add metrics tracking of your monitoring events it’s easy to add to the existing classes. Add an IMetrics property to your ApplicationMonitoring class then call it from the existing methods. For example, to track the count of failed logins change the UserLogInFailed method to -

   public static void UserLogInFailed( this ApplicationMonitoring monitoring, int userId )
   {
       monitoring.Logger.Information("{Action} finished with {Status} for {UserId}", Actions.LogIn, Statuses.Failed, userId);
       monitoring.Metrics.IncrementCounter( Counters.LogInFailed );
   }

The last thing that’s necessary for effective logs is a way to change logging levels on the fly to tap into extra detail for troubleshooting a production issue. How to do so is outside the scope of this post but it should be easy to do and should not require a restart of the application.

I hope this post gave some clarity on effective logging in .net. Logging is an incredibly effective tool for troubleshooting a production application but it takes work. Instead of leaving it to the end of your project, I suggest laying the groundwork for logs in the beginning to set yourself up for success. Make it easy to write good logs to help developers on your project “fall into the pit of success”.

Implementing 'Keep Me Signed In' in Windows Identity Foundation

04 February 2014

A common feature of website authentication is the ‘Remember me’ or ‘Keep me signed in’ option. This feature is not a built-in feature of Windows Identity Foundation. The easiest solution is to make all Relying Party cookies Session cookies, meaning they expire when you close the browser. When you navigate back to the relying party you’ll be sent to the STS, automatically logged in and sent back. This can be a pain for a number of reasons so it’s ideal if we can setup the Relying Party cookies the same as the STS. I’ll show how it can be implemented using claims as the means of communication between the STS and Relying Party.

The STS setup

To communicate whether or not the user wanted to be remembered, we’re going to use claims. Specifically we’ll be using two existing claims from the Microsoft.IdentiyModel.Claims namespace, IsPersistent and Expiration. To do so, first add the claims to the FederationMetadata xml so you see something like this:

<auth:ClaimType xmlns:auth="http://docs.oasis-open.org/wsfed/authorization/200706" Uri="http://schemas.microsoft.com/ws/2008/06/identity/claims/ispersistent" Optional="true">
<auth:DisplayName>isPersistent</auth:DisplayName>
<auth:Description>If subject wants to be remembered for login.</auth:Description>
</auth:ClaimType>
<auth:ClaimType xmlns:auth="http://docs.oasis-open.org/wsfed/authorization/200706" Uri="http://schemas.microsoft.com/ws/2008/06/identity/claims/expiration" Optional="true">
<auth:DisplayName>Expiration</auth:DisplayName>
<auth:Description>How long before the persistent session cookie should expire</auth:Description>
</auth:ClaimType>

As the description states, we’ll be using the IsPersistent claim to communicate if the user wanted to be kept logged in and the Expiration claim to communicate the session expiration if IsPersistent is true.

The last step on the Relying Party is to set the claims on the user’s principal. Update the IClaimsPrincipal creation code to specify the two new claims.

public static IClaimsPrincipal CreatePrincipal( UserModel user, bool rememberMe )
{
if ( user == null )
{
throw new ArgumentNullException( "user" );
}

// CLAIMS ADDED HERE SHOULD MATCH WITH CLAIMS OFFERED BY METADATA
var claims = new List<Claim>
{
... // Your other claims go here
new Claim( ClaimTypes.IsPersistent, rememberMe.ToString() ),
new Claim( ClaimTypes.Expiration, TimeSpan.FromDays( DEFAULT_COOKIE_EXPIRATION_IN_DAYS ).ToString() )
};

var identity = new ClaimsIdentity( claims );

return ClaimsPrincipal.CreateFromIdentity( identity );
}

The two steps above ensure that the STS will communicate the necessary information to the Relying Party for them to set up their session to mirror the STS session.

Relying Party setup

On the Relying Party side we have to override the default WIF behavior for the session expiration and set it manually based on the claims we’ve specified in the STS. We’ll need to override the SessionSecurityTokenCreated behavior to do so. Place the following code in the global.asax of the Relying Party.

// This method does not appear to be used, but it is.
// WIF detects it is defined here and calls it.
// Note: Do not rename this method. The name must exactly match or it will not work.
[System.Diagnostics.CodeAnalysis.SuppressMessage( "Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode" )]
void WSFederationAuthenticationModule_SessionSecurityTokenCreated( object sender, SessionSecurityTokenCreatedEventArgs e )
{
bool isPersistent = false;
string expirationAsString = null;
try
{
isPersistent = ClaimsHelper.GetClaimValueByTypeFromPrincipal<bool>( e.SessionToken.ClaimsPrincipal, ClaimTypes.IsPersistent );
expirationAsString = ClaimsHelper.GetClaimValueByTypeFromPrincipal<string>( e.SessionToken.ClaimsPrincipal, ClaimTypes.Expiration );
}
catch ( ClaimParsingException )
{
Trace.TraceWarning( "Failure to parse claim values for ClaimTypes.IsPersistent and ClaimTypes.Expiration. Using session cookie as a fallback." );
}
catch ( ClaimNullException )
{
Trace.TraceWarning( "Expected claim values for ClaimTypes.IsPersistent and ClaimTypes.Expiration but got null. Using session cookie as a fallback." );
}

TimeSpan expiration;
if ( isPersistent && TimeSpan.TryParse( expirationAsString, CultureInfo.InvariantCulture, out expiration ) )
{
DateTime now = DateTime.UtcNow;
e.SessionToken = new SessionSecurityToken( e.SessionToken.ClaimsPrincipal, e.SessionToken.Context, now, now.Add( expiration ) )
{
IsPersistent = true
};
}
else
{
e.SessionToken = new SessionSecurityToken( e.SessionToken.ClaimsPrincipal, e.SessionToken.Context )
{
IsPersistent = false
};
}
e.WriteSessionCookie = true;
}

The important part is at the end. We create a new SessionSecurityToken object based on the values of the claims and overwrite the default WIF security token with it. This gives us either a session cookie or a cookie with an expiration that matches the STS value; giving us the ‘Keep me logged in’ behavior we wanted.


See All Posts