Bhavya

Code -> Break -> Fix -> Blog

Implementing Message Filter Pattern using Windows Azure Service Bus

1 Comment

In my previous post, I have implemented Content Based Router Patter. In this post I will be implementing Message Filter Pattern which is well explained by Stephen Kaufman here.

This pattern requires that any message that doesn’t match the criteria of a subscription will be discarded.  This pattern is used to eliminate any messages that aren’t routed to a subscriber such as those setup using the Content Based Router pattern.  If the message doesn’t fall into a filter used by the subscriber then the message should be routed to a filter channel.

Below is the implementation.

Person Class:


/// <summary>
 /// Class Person
 /// </summary>
 [Serializable]
 public class Person
 {
 /// <summary>
 /// Initializes a new instance of the <see cref="Person" /> class.
 /// </summary>
 public Person()
 {
 }

/// <summary>
 /// Gets or sets the name.
 /// </summary>
 /// <value>The name.</value>
 public string Name
 {
 get;
 set;
 }

/// <summary>
 /// Gets or sets the hobbies. 
 /// comma separated hobby names.
 /// </summary>
 /// <value>The hobbies.</value>
 public string Hobbies
 {
 get;
 set;
 }

/// <summary>
 /// Returns a <see cref="System.String" /> that represents this instance.
 /// </summary>
 /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 public override string ToString()
 {
 return Name;
 }
 }

Fields:


// TODO : Set the IssueName, IssueKey and ServicebusNamespace variable 
// with proper values before running the code.
 static string IssueName = string.Empty;
 static string IssueKey = string.Empty;
 static string ServicebusNamespace = string.Empty;

static NamespaceManager namespaceManager;
 static MessagingFactory messagingFactory;
 static TopicDescription topicDescription;

Utilities:


static TopicDescription CreateTopic(string topicName)
{
TopicDescription topicDescription = new TopicDescription(topicName);
topicDescription.EnableFilteringMessagesBeforePublishing = true;

//Check if already exists.
if (namespaceManager.TopicExists(topicDescription.Path))
{
namespaceManager.DeleteTopic(topicDescription.Path);
}

Console.WriteLine("Creating Topic : " + topicName);
return namespaceManager.CreateTopic(topicDescription);
}

/// <summary>
/// Creates the subscription.
/// </summary>
/// <param name="subscription">The subscription.</param>
static void CreateSubscription(Dictionary<string, string> subscription)
{

if (null != topicDescription)
{
foreach (KeyValuePair<string, string> item in subscription)
{
if (string.IsNullOrEmpty(item.Value))
{
namespaceManager.CreateSubscription(topicDescription.Path, item.Key);
}
else
{
namespaceManager.CreateSubscription(topicDescription.Path, item.Key, new SqlFilter(item.Value));
}
}
}
}

/// <summary>
/// Sends the message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="item">The item.</param>
static void SendMessage(Person message, KeyValuePair<string, object> item)
{
try
{
TopicClient topicClient = messagingFactory.CreateTopicClient(topicDescription.Path);

BrokeredMessage brokeredMessage = new BrokeredMessage(message);

if (null != item.Key && null != item.Value)
{
brokeredMessage.Properties.Add(item);
}

topicClient.Send(brokeredMessage);
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine("Item " + message.ToString() + " sent to " + topicDescription.Path);
Console.ResetColor();
}
catch (NoMatchingSubscriptionException)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("No matching subscription for " + message.ToString());
Console.ResetColor();
}
}

/// <summary>
/// Receives the message.
/// </summary>
/// <param name="subscriptionName">Name of the subscription.</param>
static Person ReceiveMessage(string subscriptionName)
{
SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(topicDescription.Path, subscriptionName, ReceiveMode.ReceiveAndDelete);

BrokeredMessage receivedMessage = subscriptionClient.Receive(new TimeSpan(0, 0, 3));

return (null != receivedMessage) ? receivedMessage.GetBody<Person>() : null;
}

Implementation:


static void Main(string[] args)
{

if (string.IsNullOrEmpty(IssueName) || string.IsNullOrEmpty(IssueKey) || string.IsNullOrEmpty(ServicebusNamespace))
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("One of the below 3 values are missing...");
Console.WriteLine("ServicebusNamespace");
Console.WriteLine("IssuerName");
Console.WriteLine("IssuerKey");
Console.ResetColor();
Console.WriteLine("\nPress any key to receive the message...");
Console.ReadKey();
return;
}

#region Create Topic
Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri("sb", ServicebusNamespace, string.Empty);
TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(IssueName, IssueKey);

namespaceManager = new NamespaceManager(serviceBusUri, tokenProvider);
messagingFactory = MessagingFactory.Create(serviceBusUri, tokenProvider);

topicDescription = CreateTopic("MessageFilterTopic");
#endregion

#region Create Subscription
Dictionary<string, string> subscriptionItems = new Dictionary<string, string>();
subscriptionItems.Add("Hobby1", "Hobby LIKE '%Hobby1%'");
subscriptionItems.Add("Hobby2", "Hobby LIKE '%Hobby2%'");
subscriptionItems.Add("Hobby3", "Hobby LIKE '%Hobby3%'");
subscriptionItems.Add("Hobby4", "Hobby LIKE '%Hobby4%'");

CreateSubscription(subscriptionItems);
#endregion

#region Create Message
List<Person> personList = new List<Person>();
personList.Add(new Person() { Name = "Tom", Hobbies = "Hobby1,Hobby4" });
personList.Add(new Person() { Name = "Jerry", Hobbies = "Hobby1,Hobby3" });
personList.Add(new Person() { Name = "Harry", Hobbies = "Hobby2,Hobby3" });
personList.Add(new Person() { Name = "Jim", Hobbies = "Hobby1,Hobby2,Hobby3,Hobby4" });
personList.Add(new Person() { Name = "James", Hobbies = "Hobby5" });
personList.Add(new Person() { Name = "Alan", Hobbies = "Hobby6" });
#endregion

#region Send Messages
foreach (Person item in personList)
{
KeyValuePair<string, object> property = new KeyValuePair<string, object>("Hobby", item.Hobbies);
SendMessage(item, property);
}
#endregion

Console.WriteLine("Press any key to receive the message...");
Console.Read();

#region Receive Message
Console.WriteLine("\n\nHobby1:");
while (true)
{
var hobby1 = ReceiveMessage("Hobby1");
if (null != hobby1)
{
Console.WriteLine(hobby1.ToString());
}
else
{
break;
}
}

Console.WriteLine("\n\nHobby2:");
while (true)
{
var hobby2 = ReceiveMessage("Hobby2");
if (null != hobby2)
{
Console.WriteLine(hobby2.ToString());
}
else
{
break;
}
}

Console.WriteLine("\n\nHobby3:");
while (true)
{
var hobby3 = ReceiveMessage("Hobby3");
if (null != hobby3)
{
Console.WriteLine(hobby3.ToString());
}
else
{
break;
}
}
Console.WriteLine("\n\nHobby4:");
while (true)
{
var hobby4 = ReceiveMessage("Hobby4");
if (null != hobby4)
{
Console.WriteLine(hobby4.ToString());
}
else
{
break;
}
}
#endregion

Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
}

~BS

Advertisements

One thought on “Implementing Message Filter Pattern using Windows Azure Service Bus

  1. Pingback: Implementing Dynamic Router Pattern using Windows Azure Service Bus | Bhavya

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s