Trying out NServiceBus part 3: pushing an event

In previous posts I’ve shown how I’ve setup testing environment for trying out NServiceBus. After dealing with some problems applications started to work correctly, at least to some extent. The only problem left is that event is being published, but it does not always reach the handler. Why is that?

I’ve setup my solution to start both Mailer and nServiceBusFun projects when I hit F5 (you can do it by right clicking on solution and selecting “Set StartUp projects…”). And sometimes when I do that message is being sent from publisher and is being delivered to handler correctly, but sometimes it’s not. The reason for this is we have here a race condition. Simply – if Mailer fails to register before nServiceBusFun publishes the event, it is not being delivered, which is pretty natural – NServiceBus cannot hold every possible event published and push it to handlers in future after they register. So I modified my publisher slightly, to sent event message after user presses any key on keyboard.

This way I can simply wait until Mailer registers successfully and then post event to service bus. And it works great, message is being sent, handler receives it and displays appropriate message.

As you can see I’ve made it so I can send up to 5 messages (just for testing reasons), each after key press. But let’s see what happens if Mailer application crushes while processing an event. After publishing an event see that it’s handled by subscriber correctly. Than close subscriber application and send few more events from publisher. Nothing bad happens which is OK – publisher is no affected in any way by subscriber failure which is definitely what we want. But such failure cannot cause messages being lost, those events might indicate that something important has happened and in response to that some actions need to be taken. It could be very problematic if such messages were never handled. So lets restart Mailer application by right clicking on Mailer project and starting new instance from Debug menu option. What you can see is that all the messages that had been published are immediately being delivered once subscriber finishes initialization. This is great feature that takes of our minds handling subscriber failures and need to store those messages manually. NServiceBus internally uses RavenDB to effectively store and send such messages.

Let’s stop for now, this is just a tip of an iceberg that NServiceBus is and I hope to learn a lot more about it and hopefully use it in production one day. And I will of course try to share all I will learn about it in here.

Advertisements

Trying out NServiceBus part 2: No endpoint configuration found in scanned assemblies

In previous post I’ve shown how I have set up my testing environment to have some fun with NServiceBus. In the end everything seemed to be OK, yet while trying to start the application exception has been thrown saying that no endpoint configuration has been found, even though I’ve created one.

First I googled my issue trying to find out what’s going on and immediately I’ve found this thread on Stack Overflow: The dreaded “No endpoint configuration found in scanned assemblies” NServiceBus error. As the answer to the question states, this exception can be thrown in numerous situations, leaving to developer to find out which reason is the actual one in each case.

After confirming that my assembly .NET version and NServiceBus target versions are the same (.NET 4.0 in my case) I decided to follow hint provided by exception and the SO answer: create NServiceBus.Host.exe.config file and explicitly point which class should be used for configuration. The file content is simple:

The same file (with value changed to nServiceBusFun.EndpointConfig, nServiceBusFun) was created in publisher project. The value is simply namespace.className, assemblyName.

And this in fact did solve the issue from topic. But it introduced another one: BadImageFormatException. But hey! We are one step closer!
Quick googling on this exception with NServiceBus led me to this page:New NServiceBus Feature: 32-bit (x86) Host Process explaining why this exception has been thrown. Since I’m working on 64-bit system NServiceBus tries to load x64 assemblies. However by default my libraries were set to x86 mode. So I’ve changed them to Any CPU mode to make sure they are correctly working with x64 libraries.
And Voila! Mailer works, it started correctly and after few seconds NServiceBus reported that queue was created and application was waiting for messages to come.

But this left me with nServiceBusFun project, which I’ve also fixed, but now it started throwing exception saying that *.vhost.exe file cannot be loaded, and also giving a hint about how to fix this.
Exception when starting endpoint, error has been logged. Reason: Could not load C:\Users\Pako\documents\visual studio 2010\Projects\nServiceBusFun\nServiceBusFun\bin\Debug\nServiceBusFun.vshost.exe. Consider using 'Configure.With(AllAssemblies.Except("nServiceBusFun.vshost.exe"))' to tell NServiceBus not to load this file.
So I’ve updated my configuration file to implement IWantCustomInitialization interface which will give us possibility to modify configuration in code. So now the file looks like this:

which is exactly like exception said it should be. Great! Now it starts correctly, configures everything successfully and tries to send a message. But the message is not always delivered. Why? I will discuss it in my next blog post in this series.

Trying out NServiceBus

Recently I’ve been interested in building software by following Domain Driven Design approach. I couldn’t try my knowledge on any business project yet, but it didn’t stop me (and should not stop you, dear reader) from looking around for some interesting solutions to common problems. One of the problem is communication between Bounded Contexts and signalling that some actions have been performed. Very natural seems concept of Domain Events. But you need some way to easily propagate such events between elements in your system, making sure it reached all interested parts. Here comes NServiceBus. So I wanted to try it out and create simple test application to see how easy it is to start working with this library. Well, it is harder than I expected, thus this blog post for you, dear reader, and mostly for me for future reference.

First you need to download NServiceBus and install it on your machine. There is free basic licence available allowing you to send 1 message per second, which seems enough for testing purposes. There are also other licences available, so you can choose one that suits your needs perfectly. But you don’t need to download installer from their page since there is very handy NuGet package that will install NServiceBus and all dependencies for you.

I’ve created very simple solution with 3 projects in it: Mailer – will be used as message handler, faking e-mail sending service; Messages – shared between handlers and main application, defines message contracts; nServiceBusFun – application that will be sending events. That should be enough for my purposes. I’ve installed NServiceBus to both Mailer and nServiceBusFun projects. Messages project needed only NServiceBus.Interfaces library as a reference.

As a next step I’ve created simple event message that should pretend to be business event that could happen in business system.
Nothing fancy here, simple class that implements IMessage interface (which comes from NServiceBus.Interfaces) and it’s great that creating message is that simple. Of course there is more you can do with messages, but for trying it out it is enough.

After that I’ve created EndpointConfig.cs files, one in Mailer and one in nServiceBusFun projects. Those classes provide NServiceBus with configuration for endpoints, and both files are very similar, as you can see below.

One is simply marked as a server (this is mailer, since it recieves messages from clients) and the other is marked as publisher (nServiceBusFun, since it’s posting events to service bus).

So we have a message, now lets create handler. I’ve created ClientRegisteredHandler.cs file with this content:

No thrill here as well; interface tells that this class can handle messages for given type (in this case ClientRegistered). Handle method is invoked when NServiceBus gets such message. Simple and clean, just like we want it.

Our system would not give us much inside into NServiceBus if we would not create some class to publish events. So in nServiceBusFun project I’ve added this simple class:

Also pretty self-explanatory. NServiceBus will instantiate object of this class on start up and call Run() method.

Seems that all that we wanted to do is ready, isn’t it? If we run both Mailer and nServiceBusFun applications it should be OK, our client will be started, will post event informing all interested parts of the system that new client has just registered, Mailer application will get this message and pretend to send an e-mail.

But it does not work, at least for me. What it gives me is an exception on start of nServiceBusFun application that it could not find service configuration. By default it searches through all libraries in runtime folder looking for all classes that implements IConfigureThisEndpoint interface. I did this in EndpointConfig class which is in the same assembly as other parts of application, so it should work but it does not.

No endpoint configuration found in scanned assemblies. This usually happens when NServiceBus fails to load your assembly containing IConfigureThisEndpoint. Try specifying the type explicitly in the NServiceBus.Host.exe.config using the appsetting key: EndpointConfigurationType, Scanned path: C:\Users\Pako\documents\visual studio 2010\Projects\nServiceBusFun\Mailer\bin\Debug\

In next post I will try to explain why and how to fix this.

Synchronization with async/await

Do you always want to block whole thread waiting for some resources? Probably not. Why not benefit from async/await constructs available in .NET 4.5? But you cannot simply await for ManualResetEvent or Semaphore. Here comes great series of posts by Stephen Toub on how to implement async-enabled synchronization mechanisms.

It starts with ManualResetEvent, and then continuous for total of 7 blog posts. Must-read for all async/await fans out there.

Just start from http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx and move forward.

Have fun!

Synchronizing execution with ManualResetEvent

Yesterday I was asked a question:

We have one thread that’s starting 200 additional calculation threads. How can we synchronize execution of the main thread so that it will wait until all 200 worker threads complete?

My first answer was: use Task Parallel Library, queue 200 Tasks and Task.WaitAll() on them. Easy, straightforward and very readable. Yet that was not exactly what was expected, since using TPL was for some reason banned ;)
Well then – we have all those fancy synchronization mechanisms, why not use them? Semaphor, Monitor, ManualResetEvent, AutoResetEvent – those were my first ideas. So I was asked to explain ManualResetEvent in more details.
Then things got a little bit more awkward for me. I knew some time ago how to use them and this would give me no problem. But since time has passed by, my mind decided that implementation details are not needed any more and garbage collected. So I started explaining how I think it works. Enough said, I messed up pretty much all that was to mess up. So to make up for it I decided to blog about it.

Here we have some basic implementation:

        private static void ThreadingProc()
        {
            const int calculationsCount = 200;

            for (int i = 0; i < calculationsCount; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(calculation), i);
            }

            Console.WriteLine("Calculation completed");
            Console.ReadKey();
        }

        private static void calculation(object data)
        {
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + (int)data);
        }

We just fire 200 threads form ThreadPool, pass thread number to each one of them and display completion message. Each thread simulates some work and then displays its completion message as well.
Result is far from expected and looks something like this:

Calculation completed
Completed calculation 1
Completed calculation 2
Completed calculation 3
...

But what else would we expect from not synchronized parallelized execution? So let’s use ManualResetEvent to help us out.
Since our computation method can get only one parameter of object type, we will first create container class to prepare all parameters and then pass them to method:

        class TaskInfo
        {
            public int TaskId { get; set; }
            public ManualResetEvent Done { get; set; }

            public TaskInfo(int taskId, ManualResetEvent done)
            {
                this.TaskId = taskId;
                this.Done = done;
            }
        }

And then slightly modified code:

        private static void ThreadingProc()
        {
            const int calculationsCount = 200;
            ManualResetEvent[] sync = new ManualResetEvent[calculationsCount];

            for (int i = 0; i < calculationsCount; i++)
            {
                ManualResetEvent done = new ManualResetEvent(false);
                sync[i] = done;
                TaskInfo ti = new TaskInfo(i, done);
                ThreadPool.QueueUserWorkItem(new WaitCallback(calculation), ti);
            }

            WaitHandle.WaitAll(sync);

            Console.WriteLine("Calculation completed");
            Console.ReadKey();
        }

        private static void calculation(object data)
        {
            TaskInfo ti = (TaskInfo)data;
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + ti.TaskId);
            ti.Done.Set();
        }

Works like a charm… Well, kind of. As you will probably notice, application will end with exception saying that WaitHandle.WaitAll can support up to 64 wait objects. That’s not going to stop us for long, does it?

            for (int i = 0; i < calculationsCount; i+=64)
            {
                WaitHandle.WaitAll(sync.Skip(i).Take(64).ToArray());
            }

This simple modification takes batches of 64 ManualResetEvents and waits for them. When they are completed, it waits for another batch and so on. Since we want to wait for all threads to complete – this give us exactly what we want with simple LINQ query.
And how would it look using TPL?

        private static void TaskProc()
        {
            const int calculationsCount = 200;
            Task[] tasks = new Task[calculationsCount];

            for (int i = 0; i < calculationsCount; i++)
			{
                tasks[i] = new Task(calculationTask, i);
                tasks[i].Start();
			}

            Task.WaitAll(tasks);
        }

        private static void calculationTask(object data)
        {
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + (int)data);
        }

So quite similar, yet more readable for me. And a little faster (for 200 calculations, without console access, Tasks took about 2,5s on my machine, while ThreadPool needed 3,4s to complete) but don’t take my word on it, it was just quick mini benchmark. But mini benchmarks are fun! We all should do them sometimes just for a sake of doing them :)

Maybe this post will help someone someday. Or maybe just me in future.
See you next time!