Richard Lemmens website

Copyright:
PublicDomain
This text content and maps on this page are in the public domain. This means you are free to copy, adapt, share, distribute and even capitalize on it.

Error handling in Azure worker roles

Azure Worker roles are suitable for background processes that take a long time, or workers that process repeating commands / input. In an ideal situation, they keep on running almost continuously, only occasionally interrupted by an update to the underlying virtual machine. However, reality is not ideal. Problems do occur and not all can be foreseen. The latter will often raise some unexpected exception in the code of the worker role. Microsoft's way of letting worker roles deal with uncatched exceptions is to restart them. This may solve some problems, but not all. If the cause of the exception is not taken away, the exception is likely to re-occur, causing another restart. The result is that the worker role goes into an endless loop of crashes and restarts, which is hard to debug because everything is reset as soon as it has been set up.

I prefer to adopt a different pattern. Errors should be catched, logged and alerts should be raised. Below is some example code that shows how to do that. There are a couple of things to note:

		
using System.Linq;

namespace myNamespace
{
    public class MyWorkerRole : Microsoft.WindowsAzure.ServiceRuntime.RoleEntryPoint, System.IDisposable
    {
        private const int sleepInterval = 500; // in milliseconds
        private const int killTimeout = 50; // in milliseconds

        // Sempahores
        private readonly System.Threading.CancellationTokenSource cancellationTokenSource = new System.Threading.CancellationTokenSource();
        private readonly System.Threading.ManualResetEvent runCompleteEvent = new System.Threading.ManualResetEvent(false);
        // Service
        private bool healthy = false;
        // Monitoring
        private volatile string state = null;
        private System.Net.Sockets.Socket listener = null;
        private System.Threading.Thread listenerThread = null;

        #region startup

        public override bool OnStart()
        {
            System.Net.ServicePointManager.DefaultConnectionLimit = 12;
            bool result = base.OnStart();
            SetupListener();
            if (this.LogLimit >= LogLevel.Info)
            {
                System.Diagnostics.Trace.TraceInformation("MyWorkerRole has been started");
            }
            return result;
        }

        public override void Run()
        {
            System.Diagnostics.Trace.TraceInformation("MyWorkerRole is running");
            try
            {
                this.random = new System.Random();
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            finally
            {
                this.runCompleteEvent.Set();
            }
        }

        private async System.Threading.Tasks.Task RunAsync(System.Threading.CancellationToken cancellationToken)
        {
            ReportAndLog("Initializing", LogLevel.Debug, false);
            if (this.listenerThread == null || (this.listenerThread.ThreadState & (System.Threading.ThreadState.Unstarted | System.Threading.ThreadState.Stopped)) != 0)
            {
                this.listenerThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(ProcessMonitoringRequests));
                this.listenerThread.Start(cancellationToken);
            }
            this.healthy = true;
            while (!cancellationTokenSource.IsCancellationRequested)
            {
                try
                {
                    if (this.healthy)
                    {
                        ReportAndLog("Working", LogLevel.Debug, false);
                        DoMyWork(cancellationToken); 
                        ReportAndLog("Sleeping", LogLevel.Debug, false);
                    }
                }
                catch (System.Exception ex)
                {
                    this.healthy = false;
                    System.Diagnostics.Trace.TraceError(string.Format("{0} while working: {1}\n{2}", ex.GetType().FullName, ex.Message, ex.StackTrace));
                    try
                    {
                        string message = "Error while working: " + ex.ToString();
                        this.state = message;
                        SendAdministratorAlerts("Fatal error while working", message);
                    }
                    catch (System.Exception)
                    { }
                }
                await System.Threading.Tasks.Task.Delay(sleepInterval, cancellationToken);
            }
            KillThread(this.listenerThread);
        }

        private void SetupListener()
        {
            Microsoft.WindowsAzure.ServiceRuntime.RoleInstance roleInstance = Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance;
            System.Net.IPEndPoint endpoint = roleInstance.InstanceEndpoints["pdf"].IPEndpoint;
            this.listener = new System.Net.Sockets.Socket(endpoint.AddressFamily, System.Net.Sockets.SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
            this.listener.Bind(endpoint);
            this.listener.Listen(3);
        }

        #endregion

        #region shutdown

        public override void OnStop()
        {
            if (this.LogLimit >= LogLevel.Info)
            {
                System.Diagnostics.Trace.TraceInformation("MyWorkerRole is stopping");
            }
            this.cancellationTokenSource.Cancel();
            this.runCompleteEvent.WaitOne();
            base.OnStop();
            System.Diagnostics.Trace.TraceInformation("MyWorkerRole has stopped");
        }

        public void Dispose()
        {
            Dispose(true);
            System.GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.cancellationTokenSource.Dispose();
                this.runCompleteEvent.Close();
            }
        }

        private static void KillThread(System.Threading.Thread thread)
        {
            if (thread != null && thread.IsAlive)
            {
                thread.Interrupt(); // signal worker thread to abort a.s.a.p.
                thread.Join(killTimeout); // wait until worker thread is completely cleaned up
                if (thread.IsAlive)
                {
                    // worker thread still not dead, kill it immediately
                    thread.Abort();
                }
            }
        }

        #endregion

        private void DoWork(System.Threading.CancellationToken cancellationToken)
        {
            // Does the actual work of the worker role.
            // Not detailed in this example.
        }

        #region monitoring

        private void ProcessMonitoringRequests(object data)
        {
            System.Threading.CancellationToken cancellationToken = (System.Threading.CancellationToken) data;
            while (!cancellationToken.IsCancellationRequested)
            {
                // NB: The call to the Accept method blocks the thread, so that the while loop will not pick up a cancellation request until a request for the listener comes in.
                System.Net.Sockets.Socket handler = this.listener.Accept();
                // Handle the request on a separate thread, so that this thread is available again to receive other requests
                System.Threading.Thread handlerThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(ProcessMonitoringRequest));
                handlerThread.Start(handler);
            }
        }

        private void ProcessMonitoringRequest(object data)
        {
            System.Net.Sockets.Socket handler = (System.Net.Sockets.Socket) data;
            try
            {
                byte[] requestBytes = new byte[256];
                handler.Receive(requestBytes);
                string request = System.Text.Encoding.ASCII.GetString(requestBytes, 0, requestBytes.Length);
                request = (request == null) ? null : request.TrimEnd(new char[] { '\0' });
                string response = null;
                switch (request)
                {
                    case "get status":
                        response = this.state;
                        break;
                    default:
                        response = "Error: Unknown request";
                        break;
                }
                byte[] responseBytes = System.Text.Encoding.ASCII.GetBytes(response);
                handler.Send(responseBytes);
            }
            finally
            {
                handler.Shutdown(System.Net.Sockets.SocketShutdown.Both);
                handler.Close();
            }
        }

        #endregion

        private void ReportAndLog(string message, LogLevel logLevel, bool log)
        {
            message = string.Format("[{0}] {1}", Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.Id, message);
            this.state = message;
            if (log && this.LogLimit >= logLevel)
            {
                switch (logLevel)
                {
                    case LogLevel.Fatal:
                    case LogLevel.Error:
                        System.Diagnostics.Trace.TraceError(message);
                        break;
                    case LogLevel.Warning:
                        System.Diagnostics.Trace.TraceWarning(message);
                        break;
                    case LogLevel.Info:
                    case LogLevel.Debug:
                        System.Diagnostics.Trace.TraceInformation(message);
                        break;
                    default:
                        throw new System.ArgumentOutOfRangeException("logLevel", "Unknown logging level: " + logLevel);
                }
            }
        }

        private LogLevel LogLimit
        {
            get
            {
                string logLevelStr = Microsoft.Azure.CloudConfigurationManager.GetSetting("logLevel");
                return (LogLevel) System.Convert.ToInt32(logLevelStr);
            }
        }

        private static void SendAdministratorAlerts(string subject, string body)
        {
            // Sends out an email alert to all configured administrators.
            // Not detailed in this example.
        }
    }

    internal enum LogLevel
    {
        Fatal = 0,
        Error,
        Warning,
        Info,
        Debug
    }
}