Consuming RabbitMQ messages in PowerShell

§ November 18, 2010 17:23 by beefarino |

At the moment I’m preparing another PowerShell module for release to open source – this one makes it dirt simple to wire up a PowerShell script to a RabbitMQ server, so it can participate in distributed messaging or ESB solutions.  I’ve only been working on this for a day, but it’s already proven its value to me and I’m too titillated not to write about it!

A little background: I’ve been working on a distributed queue-driven solution for the last few months.  The client uses a LAMP stack and has chosen RabbitMQ as their queue.  While this has been my only experience with it, I’ve come to love RabbitMQ – easy to set up, very robust, cross-platform, simple client libraries, and awesome documentation and community support.  If you want to check it out, Justin Etheredge has a fantastic get-you-started post.

bunnyAnyway, as distributed solutions and facebook marriage statuses are wont to do, things got complicated.  I’ll save the details for another post.  Suffice it to say that I needed some  RabbitMQ consumers that could perform lots of simple tasks – like report real-time instrumentation messages for a specific job.  So I made a few broad passes and came up with a simple and effective way to do that using PowerShell.

API Overview

At present the API feels a lot like the PowerShell jobs and events - you can consume messages via polling a queue, blocking until a message arrives, or assigning a script block to process the messages as they arrive. 

Starting a message consumer is simple enough:

1 import-module poshrabbit; 2 $q = start-consumer -hostname RabbitMQSvr -exchange posh -routingkey 'prefix.#'

That’s all there is to it.  The consumer is now connected and trying to pull messages.  The variable $q contains data used by the other cmdlets in the API to identify messages retrieved by this consumer.  For instance, if you wanted to block until a message arrived:

1 import-module poshrabbit; 2 $q = start-consumer -hostname RabbitMQSvr -exchange posh -routingkey 'prefix.#' 3 4 #the script blocks on the call to wait-consumer 5 # until a message is received; the message is 6 # returned in the $event variable 7 $msg = wait-consumer $q; 8 9 write-host 'event received: ' $msg;

The wait-consumer cmdlet does for RabbitMQ messages what the wait-event cmdlet does for events.  The script blocks until a message is received by the specified consumer.

Or perhaps a script needs to do other things when no message are available:

1 import-module poshrabbit; 2 $q = start-consumer -hostname RabbitMQSvr -exchange posh -routingkey 'prefix.#' 3 4 #receive-consumer does not block, if no 5 # messages are available $msgs will be $null 6 $msgs = receive-consumer $q; 7 if( -not $msgs ) 8 { 9 write-host 'no messages at this time'; 10 } 11 else 12 { 13 $msgs | write-host; 14 }

In this case receive-consumer will return any messages received since the last call to receive-message; however it will not block script execution – if no messages are available it will return $null immediately.

Or if you just want to fire-and-forget:

1 import-module poshrabbit; 2 3 #splatting added for readability 4 $a=@{ 5 hostname = 'RabbitMQSvr'; 6 exchange = 'posh'; 7 routingkey = 'prefix.#'; 8 9 #this script block will be run for 10 #each message received; no additional 11 #polling or waiting code is needed 12 action = { $_ | write-host }; 13 }; 14 $q = start-consumer @a;

Assigning a scriptblock to start-consumer’s Action parameter will automagically run the scriptblock for each incoming message.  No need to poll or wait, your PowerShell session can move on to other tasks and still process each message as it comes in, in real-time!

Eventually you’ll want to stop the consumer:

1 import-module poshrabbit; 2 3 #splatting added for readability 4 $a=@{ 5 hostname = 'RabbitMQSvr'; 6 exchange = 'posh'; 7 routingkey = 'prefix.#'; 8 }; 9 $q = start-consumer @a; 10 # ... 11 stop-consumer $q;

Simple enough, just tell stop-consumer which consumer you want to stop.  Or end your PowerShell session and the module will clean up the RabbitMQ resources for you.

Oh, and you’ll probably want to publish messages back to a RabbitMQ exchange at some point, so I added a simple way to do that via the publish-string cmdlet:

1 import-module poshrabbit; 2 3 #splatting added for readability 4 $a=@{ 5 hostname = 'RabbitMQSvr'; 6 exchange = 'posh'; 7 routingkey = 'prefix.batch-0_o'; 8 }; 9 publish-string @a -message 'hello world!'

There’s more to each of these cmdlets (e.g., username, passwords, the option to specify exchange types or queue names, a timeout on wait-consumer), but in terms of functionality that’s it.  I’m pretty happy with the API; I think it’s simple, readable, and proven to be powerful in the 8 hours I’ve been dogfooding it. 


Here’s a sample script that collates instrumentation messages for a specific “batch” of work for my client:

1 param( [string]$batchId ); 2 3 import-module poshrabbit; 4 5 $a=@{ 6 hostname = 'Que'; 7 exchange = 'LSI.PP.Instrumentation'; 8 routingkey = 'Batch.' + $batchId; 9 action = { $_ | write-host }; 10 }; 11 12 write-host "Real-time activity for batch $batchId:"; 13 $q = start-consumer @a;

Next Steps

What really gets me excited are the possibilities – the notion of load-balancing PowerShell tasks across several machines … coordinating the efforts of a deployment script and a build server … service-oriented PowerShell scripts that run on-demand … or for that matter, use RabbitMQ as the foundation for a load-balanced psake CI environment.

But first thing’s first – I want to get this module clean and pretty and put it someplace where you all can get at it and make it better.  My hope is to do that in the next day or so (clients willing).

mongodb log4net appender

§ October 1, 2010 07:26 by beefarino |

Hey everybody!

Just wanted to let you know about this little gem, in case you’re having the same love affair with document databases that I am at the moment…

Jozef Sevcik has generously released an open-source log4net appender that logs to a mongodb!  Looks super-easy - I’m currently folding this in to project and will let you know how it goes.

Until then, please check out his great work on GitHub!  Thanks Jozef!

Decorator Unity Container Extension

§ September 10, 2010 02:09 by beefarino |

I’ve been using Unity as the DI container in a client project.  I like the ability to configure the container at runtime, and I like the fluent mapping registration interface, but I don’t like how intimate I need to be with the Unity design to do things.  For instance, configuring the container to support a decorator or chain object pattern requires some non-intuitive container configuration code.

For instance, consider this simple object graph:

1 public interface IContract 2 { 3 } 4 5 public class Contract : IContract 6 { 7 } 8 9 public class ContractDecorator : IContract 10 { 11 public IContract Base { get; set; } 12 13 public ContractDecorator(IContract @base) 14 { 15 Base = @base; 16 } 17 }

The IContract interface defines the contract, which is empty in this case for simplicity.  The Contract class defines a concrete implementation, and the ContractDecorator is a decorating object that needs an IContract implementation during initialization.

My intent is to register this relationship in my IoC container, so when I ask for an IContract, I get a Contract instance wrapped in a ContractDecorator instance.  In short, the same result as this code:

1 var contract = new ContractDecorator( 2 new Contract() 3 );

If this were Castle Windsor, it would be easy to do:

1 var container = new WindsorContainer(); 2 var kernel = container.Kernel; 3 // ... 4 kernel.AddComponent<IContract, ContractDecorator>(); 5 kernel.AddComponent<IContract, Contract>();

Castle allows me to stack my type mappings using a simple convention: during the object graph buildup, dependencies of type IContract resolve using the next type mapping added to the component stack.  In this case when I ask Castle for an IContract, I end up getting a ContractDecorator instance initialized with a Contract instance.  It just works, the way I expect it to.

Unity is not so just-worky-the-way-I-expecty in this case.  In fact, making this work can be pretty esoteric.  I attempted something based on an example from David Hayden (see this):

1 var container = new UnityContainer(); 2 container.RegisterType( 3 typeof( IContract ), 4 typeof( Contract ), 5 "Contract" 6 ); 7 contract.RegisterType( 8 typeof( IContract ), 9 typeof( ContractDecorator ), 10 new InjectionConstructor( 11 new ResolvedParameter( 12 typeof( IContract ), 13 "Contract" 14 ) 15 ) 16 ); 17 var contract = container.Resolve<IContract>();

I think this code is obtuse, and having to use a magic string feels fragile.  Much of it is there only to support Unity’s internal model, not my object model, and I’m not terribly interested in Unity’s internal model.  I use this object pattern quite a bit, and I’d rather Unity support a simple convention the way Castle Windsor does –  so I coded up a Unity Container Extension to make it happen.

The Container Extension

Here is the container extension code:

1 public class DecoratorContainerExtension 2 : UnityContainerExtension 3 { 4 private Dictionary<Type, List<Type>> _typeStacks; 5 protected override void Initialize() 6 { 7 _typeStacks = new Dictionary<Type, List<Type>>(); 8 Context.Registering += AddRegistration; 9 10 Context.Strategies.Add( 11 new DecoratorBuildStrategy(_typeStacks), 12 UnityBuildStage.PreCreation 13 ); 14 } 15 16 private void AddRegistration( 17 object sender, 18 RegisterEventArgs e) 19 { 20 if (!e.TypeFrom.IsInterface) 21 { 22 return; 23 } 24 25 List<Type> stack = null; 26 if (!_typeStacks.ContainsKey(e.TypeFrom)) 27 { 28 stack = new List<Type>(); 29 _typeStacks.Add(e.TypeFrom, stack); 30 } 31 else 32 { 33 stack = _typeStacks[e.TypeFrom]; 34 } 35 36 stack.Add(e.TypeTo); 37 } 38 }

The container extension does two things:

  1. It tracks type registrations for interfaces, building up an ordered list of types that have been registered to each interface (lines 20-36).
  2. It initializes a new DecoratorBuildStrategy and adds it to the chain of build-up strategies (lines 10-13).

The Build Strategy

The build strategy is where the magic happens:

1 public class DecoratorBuildStrategy : BuilderStrategy 2 { 3 private readonly Dictionary<Type, List<Type>> _typeStacks; 4 5 public DecoratorBuildStrategy( 6 Dictionary<Type, List<Type>> typeStacks 7 ) 8 { 9 _typeStacks = typeStacks; 10 } 11 12 public override void PreBuildUp(IBuilderContext context) 13 { 14 var key = context.OriginalBuildKey; 15 16 if (!(key.Type.IsInterface && 17 _typeStacks.ContainsKey(key.Type))) 18 { 19 return; 20 } 21 22 if (null != context.GetOverriddenResolver(key.Type)) 23 { 24 return; 25 } 26 27 Stack<Type> stack = new Stack<Type>( 28 _typeStacks[key.Type] 29 ); 30 31 object value = null; 32 stack.ForEach( 33 t =>{ 34 value = context.NewBuildUp( 35 new NamedTypeBuildKey(t, key.Name) 36 ); 37 var overrides = new DependencyOverride( 38 key.Type, 39 value 40 ); 41 context.AddResolverOverrides(overrides); 42 } 43 ); 44 45 context.Existing = value; 46 context.BuildComplete = true; 47 } 48 }
The build strategy’s PreBuildUp method uses the type lists from the container extension to iteratively build up instances of the interface (lines 32-43).  In each iteration of build-up, a DependencyOverride redefines the object used to resolve the interface type (lines 37-41).  In effect, this “walks the type stack” associated with the interface, allowing the least-dependent implementation to be created first, then injected into the next least-dependent implementation, and so on.

The algorithm makes use of the Unity IoC to resolve dependencies during each step of the “type stack walk” (the NewBuildUp method call on line 34).  This means that each object type in the stack will be constructed using the same IoC context, so additional dependencies will be available to them during creation.  It also means that this build strategy is recursive, so special care must be taken to prevent stack smashing – line 22 checks if the DependencyOverride is in effect for the interface type, skipping the “type stack walk” and short-circuiting an endless recursive call chain.

The Extension in Use

This is where things get easy – simply add the DecoratorContainerExtension to the UnityContainer, and then you can use the same convention available in the Castle Windsor container:

1 [Fact] 2 public void CreatesTheMostDependentType() 3 { 4 var c = new UnityContainer() 5 .AddExtension(new DecoratorContainerExtension()) 6 .RegisterType<IContract, ContractDecorator>() 7 .RegisterType<IContract, Contract>(); 8 9 var o = c.Resolve<IContract>(); 10 Assert.NotNull(o); 11 Assert.IsType(typeof(ContractDecorator), o); 12 Assert.IsType( 13 typeof(Contract), 14 ((ContractDecorator)o).Base 15 ); 16 }

This is simple, it’s less code, and most importantly it describes my intent better.  And it’s focused on my object model and not Unity’s, so I can create arbitrarily complex decorator chains without any fuss.  I’m fairly happy with it.


MEFfing Disappointed

§ September 2, 2010 08:04 by beefarino |

Working on a highly modular service offering this week, and I took a looksy at the Microsoft Extension Framework as a way to wireup the service components.  I made the choice to pass MEF over, but it was a tough call.  It’s not that MEF doesn’t offer enough features; quite to the contrary I’m pleased with the focus of its feature set.  The reason I didn’t use it was simply that is rubs me the wrong way.  Lemmie splain, and yes this post is a little persnickety, but maybe someone can set me straight…

Things I Don’t Like

MEF requires decoration of dependency objects, and I don’t see the point to that.  Here’s an example of what I mean from the MEF wiki:

1 [Export(typeof(IMessageSender))] 2 public class EmailSender : IMessageSender { 3 ... 4 } 5 6 [Export(typeof(IMessageSender))] 7 public class TCPSender : IMessageSender { 8 ... 9 }

I fail to understand why the Export attribute is necessary in this standard (and according to the MEF wiki, recommended) usage pattern.  I see many issues here…

The ExportAttribute simply duplicates information that is already available via the .NET typing system.  By definition, any public type is exported by the assembly that contains it.  If you argue that you may have public types that you don’t want to export for consumption, I would want to know why they’re public instead of internal types.  So why would I want yet another vector on my type that just duplicates information I already have?

Someone pointed out the ExportMetadataAttribute as a counter to this argument.  I responded by asking why the export metadata shouldn’t be expressed as properties on the object being exported.

It’s not that I mind contract-based object graph wireups like this, but I don’t like that the contract is part of the type definition.  I want to allow the object that is consuming this dependency to choose how to consume it, not the other way around.  In fact, cramming the wireup contract into the type definition is a cost, not a benefit – if you need to modify the wireup, you need to recomplie both the consuming and dependency objects.  Why would I want that when I can specify the wireup elsewhere, like an IoC assembly or XML file?

In short, setting up these exports seems like duplicate work with no payoff.

Things I Do Like (and Some I Would Change)

I like the assembly catalog concept in MEF – of building a list of assemblies the consumer deems appropriate for importing its dependencies.  I like that because enables the application behavior to be dynamic, which is really the point of frameworks like these isn’t it? 

I also like the idea of decorating a consuming object with attributes to describe what they want or need.  Well, sort of.  At first the ImportAttribute and her sisters looked a lot like the ExportAttribute, and I had the same qualms about them:

1 public class Notifier { 2 [ImportMany] 3 public IEnumerable<IMessageSender> Senders {get; set;} 4 public void Notify(string message) { 5 foreach(IMessageSender sender in Senders) 6 sender.Send(message); 7 } 8 }

If the ImportManyAttribute were missing, would anyone looking at the object not know that it needs a collection of IMessageSenders?  Of course not.  So at first I thought it was another redundant framework construct.  Then I realized that it wasn’t expressing a dependency of the object, it was marking a object dependency as a point for extensibility.

In other words, the ImportAttribute says “you can use whatever type you find in the dynamic catalog here.”  That’s beyond expressing a dependency – it’s defining how that dependency should be resolved.  I don’t mind that as much, and I’m even okay with having that expressed in the type definition.  I think I would still prefer expressing the dependency resolution outside of the consuming type, but I’m warming up to the idea.

However, if MEF wants me to work this way I see some significant improvements that can be made.  First and foremost, support for standard object patterns.  E.g., I would prefer to implement the Notifier example this way:

1 public class Notifier { 2 [ImportAsComposite] 3 public IMessageSender Sender {get; set;} 4 public void Notify(string message) { 5 Sender.Send(message); 6 } 7 }

It’s less code, and my intent is just as clear.  Moreover, I would want to be able to apply decorators:

1 public class Notifier { 2 [Import] 3 [DecorateWith(InstrumentingMessageSender)] 4 public IMessageSender Sender {get; set;} 5 public void Notify(string message) { 6 Sender.Send(message); 7 } 8 }

and chains of responsibility:

1 public class Notifier { 2 [ImportAsChain] 3 public IMessageSender Sender {get; set;} 4 public void Notify(string message) { 5 Sender.Send(message); 6 } 7 }

oh and of course default behaviors for when there is no import or chain or whatever:

1 public class Notifier { 2 [ImportAsChain] 3 [Default(NullMessageSender)] 4 public IMessageSender Sender {get; set;} 5 public void Notify(string message) { 6 Sender.Send(message); 7 } 8 }

.. etc, etc, etc.  These are just ideas though, I have no code to back them up, but I certainly want it.  I like these because they help the consuming object express what it wants and how it will behave, and they realistically require nothing unique on the dependency objects.

You smell that too?  It’s another project brewing…