In my previous post, I have implemented Message Filter Pattern. In this post I will be implementing Dynamic Router Pattern which is well explained by Stephen Kaufman here.
Dynamic Router as explained by Kaufman,
“This pattern provides for the scenario in that each recipient would register and list the conditions or rules for which messages or types of messages it can handle. The conditions or rules are stored so that when a message arrives, the rules run and the message parameters are evaluated against the rules and then the message is routed accordingly to the correct receiver. The benefit of this is that you can achieve predictive routing, with eased maintenance and less dependency on each receiver.
This pattern is implemented in Azure by default in both the Topic and the Queues. When you create a subscription and a receiver for a topic you are registering with the service and as part of that registration process you also provide rules or conditions for the types of messages or the value of properties that you want to filter for.
But what if we want to take this a step further and provide even more dynamic functionality. What if we could implement the functionality that is available in BizTalk called Promoted Properties but do it completely within the functionality of Azure. Promoted Properties is a way of taking data contained in the body of an object or message and put that in a properties collection that can then be used to route messages. When working with Azure, the BrokeredMessage object contains a properties collection that can be used to route however, you manually have to set these properties at design time. Our goal is to take the values of properties in the object body itself and promote those to the BrokeredMessage properties collection so that they can be assigned and routed at run time.”
Below is the implementation:
ExposedProperty Attribute
[AttributeUsage(AttributeTargets.Property)]
internal class ExposedPropertyAttribute : Attribute
{
private string _name;
public ExposedPropertyAttribute(string name)
{
this._name = name;
}
public string Name
{
get
{
return this._name;
}
}
}
Person Class:
[Serializable]
public class Person
{
/// <summary>
/// Initializes a new instance of the <see cref="Person" /> class.
/// </summary>
public Person()
{
}
/// <summary>
/// Gets or sets the name.
/// </summary>
/// <value>The name.</value>
public string Name
{
get;
set;
}
/// <summary>
/// Gets or sets the gender.
/// </summary>
/// <value>The gender.</value>
[ExposedProperty("Gender")]
public string Gender
{
get;
set;
}
/// <summary>
/// Returns a <see cref="System.String" /> that represents this instance.
/// </summary>
/// <returns>A <see cref="System.String" /> that represents this instance.</returns>
public override string ToString()
{
return Name;
}
}
Fields :
// TODO : Set the IssueName, IssueKey and ServicebusNamespace variable with proper values before running the code.
static string IssueName = string.Empty;
static string IssueKey = string.Empty;
static string ServicebusNamespace = string.Empty;
static NamespaceManager namespaceManager;
static MessagingFactory messagingFactory;
static TopicDescription topicDescription;
Utilities:
/// <summary>
/// Creates the Topic.
/// </summary>
/// <param name="topicName">The topic name.</param>
/// <returns>TopicDescription</returns>
static TopicDescription CreateTopic(string topicName)
{
TopicDescription topicDescription = new TopicDescription(topicName);
topicDescription.EnableFilteringMessagesBeforePublishing = true;
//Check if already exists.
if (namespaceManager.TopicExists(topicDescription.Path))
{
namespaceManager.DeleteTopic(topicDescription.Path);
}
Console.WriteLine("Creating Topic : " + topicName);
return namespaceManager.CreateTopic(topicDescription);
}
/// <summary>
/// Creates the subscription.
/// </summary>
/// <param name="subscription">The subscription.</param>
static void CreateSubscription(Dictionary<string, string> subscription)
{
if (null != topicDescription)
{
foreach (KeyValuePair<string, string> item in subscription)
{
if (string.IsNullOrEmpty(item.Value))
{
namespaceManager.CreateSubscription(topicDescription.Path, item.Key);
}
else
{
namespaceManager.CreateSubscription(topicDescription.Path, item.Key, new SqlFilter(item.Value));
}
}
}
}
/// <summary>
/// Sends the message.
/// </summary>
/// <param name="message">The message.</param>
static void SendMessage(Person message)
{
TopicClient topicClient = messagingFactory.CreateTopicClient(topicDescription.Path);
BrokeredMessage brokeredMessage = new BrokeredMessage(message);
brokeredMessage = ExposeProperties(brokeredMessage, message);
topicClient.Send(brokeredMessage);
}
/// <summary>
/// Receives the message.
/// </summary>
/// <param name="subscriptionName">Name of the subscription.</param>
static Person ReceiveMessage(string subscriptionName)
{
SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(topicDescription.Path, subscriptionName, ReceiveMode.ReceiveAndDelete);
BrokeredMessage receivedMessage = subscriptionClient.Receive(new TimeSpan(0, 0, 15));
return (null != receivedMessage) ? receivedMessage.GetBody<Person>() : null;
}
/// <summary>
/// Exposes the properties.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="obj">The obj.</param>
static BrokeredMessage ExposeProperties(BrokeredMessage message, object obj)
{
PropertyInfo[] properties = obj.GetType().GetProperties();
foreach (PropertyInfo property in properties)
{
foreach (ExposedPropertyAttribute exposedProperty in property.GetCustomAttributes(typeof(ExposedPropertyAttribute), false))
{
var value = property.GetValue(obj, null);
message.Properties.Add("Gender", value);
}
}
return message;
}
Main Implementation:
static void Main(string[] args)
{
#region User details check
if (string.IsNullOrEmpty(IssueName) || string.IsNullOrEmpty(IssueKey) || string.IsNullOrEmpty(ServicebusNamespace))
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("One of the below 3 values are missing...");
Console.WriteLine("ServicebusNamespace");
Console.WriteLine("IssuerName");
Console.WriteLine("IssuerKey");
Console.ResetColor();
Console.WriteLine("\nPress any key to receive the message...");
Console.ReadKey();
return;
}
#endregion
#region Create Topic
Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri("sb", ServicebusNamespace, string.Empty);
TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(IssueName, IssueKey);
namespaceManager = new NamespaceManager(serviceBusUri, tokenProvider);
messagingFactory = MessagingFactory.Create(serviceBusUri, tokenProvider);
topicDescription = CreateTopic("DynamicRouterTopic");
#endregion
#region Create Subscription
Dictionary<string, string> subscriptionItems = new Dictionary<string, string>();
subscriptionItems.Add("Male", "Gender='Male'");
subscriptionItems.Add("Female", "Gender='Female'");
CreateSubscription(subscriptionItems);
#endregion
#region Create Message
List<Person> personList = new List<Person>();
personList.Add(new Person() { Name = "Tom", Gender = "Male" });
personList.Add(new Person() { Name = "Ira", Gender = "Female" });
personList.Add(new Person() { Name = "Julie", Gender = "Female" });
personList.Add(new Person() { Name = "Jim", Gender = "Male" });
personList.Add(new Person() { Name = "Anna", Gender = "Female" });
personList.Add(new Person() { Name = "Alan", Gender = "Male" });
#endregion
#region Send Messages
foreach (Person item in personList)
{
SendMessage(item);
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine("Item " + item.Name + " sent to " + topicDescription.Path);
Console.ResetColor();
}
#endregion
Console.WriteLine("Press any key to receive the message...");
Console.ReadKey();
#region Receive Message
Console.WriteLine("\n\nFemale list:");
while (true)
{
var female = ReceiveMessage("Female");
if (null != female)
{
Console.WriteLine(female.ToString());
}
else
{
break;
}
}
Console.WriteLine("\n\nMale list:");
while (true)
{
var male = ReceiveMessage("Male");
if (null != male)
{
Console.WriteLine(male.ToString());
}
else
{
break;
}
}
#endregion
Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
}
~BS