Azure Worker roles are suitable for background processes that take a long time, or workers that process repeating commands / input. In an ideal situation, they keep on running almost continuously, only occasionally interrupted by an update to the underlying virtual machine. However, reality is not ideal. Problems do occur and not all can be foreseen. The latter will often raise some unexpected exception in the code of the worker role. Microsoft's way of letting worker roles deal with uncatched exceptions is to restart them. This may solve some problems, but not all. If the cause of the exception is not taken away, the exception is likely to re-occur, causing another restart. The result is that the worker role goes into an endless loop of crashes and restarts, which is hard to debug because everything is reset as soon as it has been set up.
I prefer to adopt a different pattern. Errors should be catched, logged and alerts should be raised. Below is some example code that shows how to do that. There are a couple of things to note:
- The worker role logs its activity to the diagnostics trace. You can use Microsoft's default trace listener to view the output, and/or attach your own. Logging level is variable and read from the configuration of the worker role, where it should be configured of course.
- A TCP listener is set up that reports the state of the worker role to a TCP client. The implementation is rather basic: it only responds to a text command "get status" and the state is exposed as a simple string. Of course this listener should be properly shielded from unauthorized access, for example via an Azure virtual network.
-
If an unhandled exception occurs, it is catched in the RunAsync method.
The exception is both logged to the diagnostics trace and an alert email is sent.
Any exception occuring while doing that is discarded; we don't want a cascade of them.
The exception causes the "healthy" boolean to be set to false, which blocks any further work and puts the worker role in flywheel mode. An administrator can now take action on the alert and state message. He/she can restart the worker role manually, after fixing the problem.
using System.Linq;
namespace myNamespace
{
public class MyWorkerRole : Microsoft.WindowsAzure.ServiceRuntime.RoleEntryPoint, System.IDisposable
{
private const int sleepInterval = 500; // in milliseconds
private const int killTimeout = 50; // in milliseconds
// Sempahores
private readonly System.Threading.CancellationTokenSource cancellationTokenSource = new System.Threading.CancellationTokenSource();
private readonly System.Threading.ManualResetEvent runCompleteEvent = new System.Threading.ManualResetEvent(false);
// Service
private bool healthy = false;
// Monitoring
private volatile string state = null;
private System.Net.Sockets.Socket listener = null;
private System.Threading.Thread listenerThread = null;
#region startup
public override bool OnStart()
{
System.Net.ServicePointManager.DefaultConnectionLimit = 12;
bool result = base.OnStart();
SetupListener();
if (this.LogLimit >= LogLevel.Info)
{
System.Diagnostics.Trace.TraceInformation("MyWorkerRole has been started");
}
return result;
}
public override void Run()
{
System.Diagnostics.Trace.TraceInformation("MyWorkerRole is running");
try
{
this.random = new System.Random();
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
private async System.Threading.Tasks.Task RunAsync(System.Threading.CancellationToken cancellationToken)
{
ReportAndLog("Initializing", LogLevel.Debug, false);
if (this.listenerThread == null || (this.listenerThread.ThreadState & (System.Threading.ThreadState.Unstarted | System.Threading.ThreadState.Stopped)) != 0)
{
this.listenerThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(ProcessMonitoringRequests));
this.listenerThread.Start(cancellationToken);
}
this.healthy = true;
while (!cancellationTokenSource.IsCancellationRequested)
{
try
{
if (this.healthy)
{
ReportAndLog("Working", LogLevel.Debug, false);
DoMyWork(cancellationToken);
ReportAndLog("Sleeping", LogLevel.Debug, false);
}
}
catch (System.Exception ex)
{
this.healthy = false;
System.Diagnostics.Trace.TraceError(string.Format("{0} while working: {1}\n{2}", ex.GetType().FullName, ex.Message, ex.StackTrace));
try
{
string message = "Error while working: " + ex.ToString();
this.state = message;
SendAdministratorAlerts("Fatal error while working", message);
}
catch (System.Exception)
{ }
}
await System.Threading.Tasks.Task.Delay(sleepInterval, cancellationToken);
}
KillThread(this.listenerThread);
}
private void SetupListener()
{
Microsoft.WindowsAzure.ServiceRuntime.RoleInstance roleInstance = Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance;
System.Net.IPEndPoint endpoint = roleInstance.InstanceEndpoints["pdf"].IPEndpoint;
this.listener = new System.Net.Sockets.Socket(endpoint.AddressFamily, System.Net.Sockets.SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
this.listener.Bind(endpoint);
this.listener.Listen(3);
}
#endregion
#region shutdown
public override void OnStop()
{
if (this.LogLimit >= LogLevel.Info)
{
System.Diagnostics.Trace.TraceInformation("MyWorkerRole is stopping");
}
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
System.Diagnostics.Trace.TraceInformation("MyWorkerRole has stopped");
}
public void Dispose()
{
Dispose(true);
System.GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.cancellationTokenSource.Dispose();
this.runCompleteEvent.Close();
}
}
private static void KillThread(System.Threading.Thread thread)
{
if (thread != null && thread.IsAlive)
{
thread.Interrupt(); // signal worker thread to abort a.s.a.p.
thread.Join(killTimeout); // wait until worker thread is completely cleaned up
if (thread.IsAlive)
{
// worker thread still not dead, kill it immediately
thread.Abort();
}
}
}
#endregion
private void DoWork(System.Threading.CancellationToken cancellationToken)
{
// Does the actual work of the worker role.
// Not detailed in this example.
}
#region monitoring
private void ProcessMonitoringRequests(object data)
{
System.Threading.CancellationToken cancellationToken = (System.Threading.CancellationToken) data;
while (!cancellationToken.IsCancellationRequested)
{
// NB: The call to the Accept method blocks the thread, so that the while loop will not pick up a cancellation request until a request for the listener comes in.
System.Net.Sockets.Socket handler = this.listener.Accept();
// Handle the request on a separate thread, so that this thread is available again to receive other requests
System.Threading.Thread handlerThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(ProcessMonitoringRequest));
handlerThread.Start(handler);
}
}
private void ProcessMonitoringRequest(object data)
{
System.Net.Sockets.Socket handler = (System.Net.Sockets.Socket) data;
try
{
byte[] requestBytes = new byte[256];
handler.Receive(requestBytes);
string request = System.Text.Encoding.ASCII.GetString(requestBytes, 0, requestBytes.Length);
request = (request == null) ? null : request.TrimEnd(new char[] { '\0' });
string response = null;
switch (request)
{
case "get status":
response = this.state;
break;
default:
response = "Error: Unknown request";
break;
}
byte[] responseBytes = System.Text.Encoding.ASCII.GetBytes(response);
handler.Send(responseBytes);
}
finally
{
handler.Shutdown(System.Net.Sockets.SocketShutdown.Both);
handler.Close();
}
}
#endregion
private void ReportAndLog(string message, LogLevel logLevel, bool log)
{
message = string.Format("[{0}] {1}", Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.Id, message);
this.state = message;
if (log && this.LogLimit >= logLevel)
{
switch (logLevel)
{
case LogLevel.Fatal:
case LogLevel.Error:
System.Diagnostics.Trace.TraceError(message);
break;
case LogLevel.Warning:
System.Diagnostics.Trace.TraceWarning(message);
break;
case LogLevel.Info:
case LogLevel.Debug:
System.Diagnostics.Trace.TraceInformation(message);
break;
default:
throw new System.ArgumentOutOfRangeException("logLevel", "Unknown logging level: " + logLevel);
}
}
}
private LogLevel LogLimit
{
get
{
string logLevelStr = Microsoft.Azure.CloudConfigurationManager.GetSetting("logLevel");
return (LogLevel) System.Convert.ToInt32(logLevelStr);
}
}
private static void SendAdministratorAlerts(string subject, string body)
{
// Sends out an email alert to all configured administrators.
// Not detailed in this example.
}
}
internal enum LogLevel
{
Fatal = 0,
Error,
Warning,
Info,
Debug
}
}