14 May 2013

Pub/Sub Implementation I -- Common Components

After explaining what I plan to build (last time) now it's time to build it out.  This will be very rough around the edges; it's more an exercise to see whether or not this idea has legs enough to be worth spending more time on.

So, if we're going to do this (even rough), let's do it more-or-less right.  That means starting with our common components.  We're going to define our two Interfaces IPublisher<T> and ISubscriber<T> and our base Message type.  Next, we'll define our MessageBox (I called mine "Broker" but "MessageBox" is more appropriate).  Finally, we'll define our final message types.

Here's IPublisher
public interface IPublisher<T> where T: Message
{
  T CreateMessage();
  bool Publish(T message);
  bool Publish(IEnumberable<T> messages);
  Task<bool> PublishAsync(T message);
  Task<bool> PublishAsync(IEnumerable<T> messages);
}

That's the easy one.  It can create a message of its appropriate type, and it can publish a single message or a group of messages.  The "Task<bool>" declarations are to take advantage of the new Async features in .NET 4.5, if we want.

Here's ISubscriber
public interface ISubscriber<T> where T: Message
{
  IList<T> PendingMessages {get; set;}
  IList<T> ProcessedMessages {get; set;}
 
  IList<T> GetMessages();
  IList<T> FilterMessages(Func<T, bool> predicate);
  Task<IList<T>> GetMessagesAsync();
}

This one defines two properties, in addition to its three methods.  It has to be able to get messages, Filter them, and (as the properties suggest) take them from "pending" to "processed."

I did not define a "Process" method, however, because each system will have different requirements for how it does that.  Maybe it just takes it's PendingMessages as is, or maybe it also accepts some parameters, or whatever.

Next, we'll take a look at some of our Message types.  Not all of them, just some.

First, the base Message class.
public abstract class Message
{
  public string Id {get; set;}
  public string Publisher {get; set;}
  public DateTime PublishedDate {get; set;}
 
  public override bool Equals(object obj)
  {
    if(obj == null) return false;
    if(obj is Message)
    {
      var msg = obj as Message;
      return (this.Id == msg.Id);
    }
  }
 
  public override int GetHashCode() 
  {
    return base.GetHashCode();
  }
}

Here, you see we're defining a string "Id" (that's because I'm using RavenDB as my database for this example- you could do it differently), a string "Publisher," and a DateTime "PublishedDate."  Those are things all of my messages will have in common- Which message am I, who created me, and when did they do that?  I've also overridden Equals.  It's possible I'll have the same message coming from two different requests to the DB- that would create two different instances that are identical- rather than leaving the base Equals, I have specified "Hey, no two of us can have the same Id."

That's enough for now.  Next time: The MessageBox (Broker).

22 April 2013

Pub/Sub Implementation III – OrderManagement 1

IPublisher<NewOrderMessage>

Last time, we outlined the common components all of our systems will use.  In Real Life, of course, these would each have more detail, and there would probably be more of them.  But, for our purposes, this will do.

Now we're going to start Implementing some functionality.  We'll start with our Order Management system, because that's the domain we're focusing on.  As such, it is also the most complex of these pseudo-systems we're going to implement.  It implements our two interfaces a total of 5 times.  Two of those are IPublisher<T> and three are ISubscriber<T>.  This post will focus on IPublisher<NewOrderMessage>.

The first IPublisher type we'll be implementing is NewOrderMessage.  This simulates the idea that the Order Management System would have a front-end (which we will not be implementing) to create an order.  That order will then be stored, and a NewOrderMessage published for anyone who wishes to know.

That means we can't just define the Order Management System, we also have to define what an Order looks like.  Again, this will be a highly simplified example.

    public class Order
    {
        public string Id { get; set; }
        public string Customer { get; set; }
        public List<OrderLine> Details { get; set; }
        public decimal Total 
            { get { return Details.Sum(d => d.LineTotal); } }
        public DateTime OrderDate { get; set; }
 
        public Order()
        {
            OrderDate = DateTime.Now;
            Details = new List<OrderLine>();
        }
 
        public override bool Equals(object obj)
        {
            if (obj == null) return false;
            if (obj is Order)
            {
                Order ord = (Order)obj;
                return (ord.Id == this.Id);
            }
            return false;
        }
 
        public override int GetHashCode()
        {
            return base.GetHashCode();
        }
    }

Notice, once again, we've overridden Equals.  This is because (once again) it would be at least theoretically possible to get two different instances (and therefore two different memory locations) of the same order- and we only want to fulfill any given order once.

Next, we'll look at OrderLine.  Again, highly simplified.

public class OrderLine
    {
        public string ItemName { get; set; }
        public string ItemSKU { get; set; }
        public decimal ItemPrice { get; set; }
        public int Quantity { get; set; }
        public decimal LineTotal 
          { get { return ItemPrice * Quantity; } }
    }

Now that we have that done, we can look at the how we handle NewOrderMessages.  A lot of the code is either boring, or a (needed) variation on code we've already done.  So I'll just highlight things that are interesting or not immediately intuitive.

public NewOrderMessage CreateOrderMessage()
  {
    IPublisher<NewOrderMessage> temp = this as IPublisher<NewOrderMessage>;
    return temp.CreateMessage();
  }
 
public NewOrderMessage CreateOrderMessage(string orderId, string customerId, decimal total, Dictionary<string, int> lines)
  {
    NewOrderMessage om = new NewOrderMessage();
    om.Publisher = PUBLISHER;
    om.PublishedDate = DateTime.Now;
    om.OrderId = orderId;
    om.CustomerId = customerId;
    om.OrderTotal = total;
    om.OrderItems = lines;
    return om;
  }

These first two let me get a new OrderMessage without having to write

IPublisher<NewOrderMessage> publisher = SalesSystem as IPublisher<NewOrderMessage>;
var message = publisher.CreateMessage;

all the time.  Instead, I can simply call CreateOrderMessage with my values from the Order, and get an order message.

Here is the version we use when we're actually processing, though:

public NewOrderMessage CreateOrderMessage(Order order)
{
    string orderId = order.Id;
    string customerId = order.Customer;
    decimal total = order.Total;
    Dictionary<string, int> lines = 
        new Dictionary<string, int>();
    foreach (OrderLine line in order.Details)
    {
        lines.Add(line.ItemSKU, line.Quantity);
    }
 
    pendingOrders.Add(order, "new");
    return CreateOrderMessage(orderId, customerId, total, lines);
}

That pendingOrders.Add refers to a Dictionary where I keep my orders (simply for this example).  IRL, we'd probably have a DB with the orders and their status.

And, finally, we get to processing:

public void GetNewOrders()
{
    List<Order> orders = new List<Order>();
    int count = 0;
    do
    {
        int cursor = orders.Count;
        var temp = broker.GetPagedMessages<Order>(ORDERS, cursor, PAGE_SIZE);
        count = temp.Count();
        if (count == 0) break;
        orders.AddRange(temp);
    }
    while (count > 0);
    ProcessOrders(orders);                      
}
 
public void ProcessOrders(List<Order> orders)
{
    var query = new List<NewOrderMessage>(); 
    int count = 0;
    do
    {
       int cursor = query.Count;
       var temp = broker.GetPagedMessages<NewOrderMessage>(MESSAGING, cursor, PAGE_SIZE);
       count = temp.Count();
       if (count == 0) break;
       query.AddRange(temp);
    }
    while (count > 0);
 
   foreach (var ord in orders)
   {
       if (!pendingOrders.ContainsKey(ord) 
           && !completedOrders.Contains(ord)
           && !query.Any(q => q.OrderId == ord.Id))
       {
           IPublisher<NewOrderMessage> pub = 
            this as IPublisher<NewOrderMessage>;
             pub.Publish(this.CreateOrderMessage(ord));
       }
   }
}

GetNewOrders goes and grabs any orders we've got.  Note the "Do/While" syntax- I want to fetch records at least once, and this way I keep going back until I've got them all.

Next, I process the orders.  To do that, the first thing I do is get all the OrderMessages I've already sent.  This is really an artifact of this being an example instead of Real Life.  In Real Life, my order would be updated to some status when I had published a new order message.  For the moment, this is easier than building out a "real" order message and ALSO building in the necessary checks.

So I get all NewOrderMessage documents from the Database.  Next, I take a look at my orders, and validate that they're not already in my pending bucket, that they're not already completed, and that I have otherwise already published a message for them.  Assuming that is true, I publish the Order via NewOrderMessage.

Next, we'll look at ISubscriber<BilledOrderMessage> and ISubscriber<AllocatedOrderMessage>.

19 April 2013

Pub/Sub Implementation II – The Message Box

Okay, I call mine "Broker" but MessageBox is more correct- therefore it will be "MessageBox" from here on out.

The MessageBox is just defining the connection to the Database.  Others would call it a DAL, but I'm not actually accessing Object Data, I'm accessing messages.  Thus, "MessageBox."  In my case, I chose RavenDB for this implementation.  I really like RavenDB- it's lightweight, it's intuitive, and it's fast to get up-and-running.

Here's my implementation:

  private MessageBox()
  {
    store = new DocumentStore() {Url = http://localhost:8080};
  }
 
  private static Lazy<MessageBox> instance = 
      new Lazy<MessageBox>(() => new MessageBox());
  public static MessageBox Instance 
      {get{return instance.Value;}}
 
  public bool Insert<T>(T item, string db)
  {
    using(var session = Store.OpenSession(db))
    {
      session.Store(item);
      session.SaveChanges();
    }
    return true;
  }  
 
  public bool BulkInsert<T>(IEnumerable<T> items, string db)
  {
    using (var bulk = Store.BulkInsert(db))
    {
      for(int i = 0; i < items.count(); i++)
      {
        bulk.Store(items.ElementAt(i));
      }
    }
    return true;
  }
 
  public T GetItemById<T>(string db, string id)
  {
    using(var session = Store.OpenSession(db))
    {
      return session.Load<T>(id);
    }
  }
 
  pulic IEnumerable<T> GetItemsByIds<T>(string db, string[] ids)
  {
    using(var session = Store.OpenSession(db))
    {
      return session.Load<T>(ids);
    } 
  }
 
  public IEnumerable GetPagedMessages<T>(string db, int start, int pageSize)
  {
    using(var session = Store.OpenSession(db))
    {
      var result = session.Query<T>()
           .Skip(start)
           .Take(pageSize);
      return result.ToList();
    }
  }
}

Okay, that was a beating, so let's look at what the code actually does.  First off, we set up our DocumentStore (that's the connection to the actual RavenDB database. 

Next, we set up the MessageBox as a singleton.  We'll probably be calling it a lot, and it's better if we don't call multiple copies per app-domain.

Finally, we establish the actual operations the MessageBox will do for us.  Note that this is the MessageBox code.  If we were using RavenDb as our backing store for our business objects, we'd probably have a different class (or just have every application use the RavenDB client, and write their own code).

There are two kind of tricky methods here.  The first is BulkInsert- I'm not sure why RavenDB doesn't support a Save method (even under it's "BulkInsert" functionality) that allows me to pass an array or enumerable of objects, but it doesn't.  So we use BulkInsert (which still only takes one at a time??).  The reason we're using a For loop, instead of ForEach, is that using BulkInsert to save each document automatically provides that object with an Id- thus modifying our collection- thus making ForEach unavailable.

The second one is the GetPagedMessages.  RavenDB, by default, will only provide you with 128 messages at a time.  So, if you expect more than that, you have to page through the results.  This is just a basic paging system, there are actually better ways to do it with RavenDb, but a) this is fast, b) this is functional, and c) it isn't actually wrong.  I use ToList() there, because the session.Query() method returns a IRavenQueryable collection, and sometimes that doesn't play nice with IEnumerable.  Using .ToList() fixes that.

Okay, that was a slog.  But I promised the MessageTypes, so here they are:

    public class NewOrderMessage : Message
    {
        public string OrderId { get; set; }
        public string CustomerId { get; set; }
        public decimal OrderTotal { get; set; }
        public Dictionary<string, int> OrderItems { get; set; } //TKey = SKU, TVal = quantity
    }
 
    public class BilledOrderMessage : Message
    {
        public string OriginId { get; set; }
        public string OrderId { get; set; }
        public string CustomerId { get; set; }
        public bool Approved { get; set; }
    }
 
    public class AllocatedOrderMessage : Message
    {
        public string OriginId { get; set; }
        public string OrderId { get; set; }
        public Dictionary<string, int> OrderItems { get; set; }
        public string WarehouseId { get; set; }
        public bool Allocated { get; set; }
    }
 
    public class FulfillmentRequestOrderMessage : Message
    {
        public string OriginId { get; set; }
        public string OrderId {get; set;}
        public string BillingId { get; set; }
        public string AllocId { get; set; }
        public string WarehouseId { get; set; }
    }
 
    public class FulfilledOrderMessage : Message
    {
        public string OriginId { get; set; }
        public string OrderId { get; set; }
        public string RequestId { get; set; }
        public bool Fulfilled { get; set; }
    }

 

Next Up- The Order Management System.

15 April 2013

Pub/Sub Revisited

How many enterprises say they want a "Publisher/Subscriber" architecture?  "We want," they say, "For the Ordering system just to send one message, and for everyone who needs it to get it.  Ordering shouldn't know anything about the down-stream systems."  Okay, that's fine as far as it goes.  But what about the other way around.  The current paradigm says that *someone* has to know about both up-stream and down-stream systems.

Either every "down-stream" system knows about every up-stream system that it cares about (and you'd be amazed at how many that is.  You'd also be amazed at how fast Ordering finds that it is down-stream of Inventory in the right circumstances), or some central system (or "Broker") knows about everybody and keeps them all straight.

Now, the first option is terrible.  Suddenly this "Publisher/Subscriber" methodology becomes Code-thulu.

The second option is fine, but it's not actually Publisher/Subscriber.  It's Hub-and-Spoke with a Broker.  Nothing wrong with that, but it has certain ramifications which will be different if you were expecting publisher/subscriber.

"But how," I hear you asking, "Can we get 'real' Publisher/Subscriber?"  And you have a point.  At some point, someone has to know about both up-stream and down-stream systems, don't they?

Well... no.  Not really, if you're willing to do a little work.

Now, let me be clear here- what I'm suggesting does not reduce complexity in any significant way.  We're talking about complex interactions between complex systems.  We can abstract that complexity away all we want, but that complexity will still exist somewhere.  What I'm suggesting allows the complexity to exist where it can most readily be changed/fixed- within the individual business domains inside your company.

How does this methodology work?  Enter the Dragon, err... Database.  Publisher/Subscriber does exist in the wild- it's called RSS, or Email, or any number of other things.  In those systems, the Publisher simply publishes a message (a blog post, an email, whatever), and the subscriber goes to look for new messages on a schedule.  Ours will work the same way.

Let us say we have a business that sells widgets online.  We'll have a system that takes the orders and owns them- it's job is to say "I've got a new order."  Also to say, "Okay, warehouse, the order is confirmed; go ahead and ship it."  We'll have a billing system.  The billing system's job is to listen for a new order, and then verify the customer paid (or has sufficient credit to pay).  We'll have an inventory system whose job is to listen for a new order and make sure we have the available inventory.  We could go on.

So how do all these systems communicate?

A common messagebox, and defined message types.

Our system will have a base "Message" from which will be derived a "NewOrderMessage," a "BilledOrderMessage," an "AllocatedOrderMessage," and a "FulfillmentRequestOrderMessage."  (There would be more as well IRL).

Then, we'll define (on top of those Messages) a MessageBox (I call it Broker on my system, but don't get confused- I merely mean that it "brokers" the DB functions), and two interfaces: IPublisher<Message> and ISubscriber<Message>.

Because these are Typed interfaces, I can actually implement several of the same one (and, in some cases I'll have to) without much difficulty.  For instance, our OrderSystem will be IPublisher<NewOrderMessage> and IPublisher<FulfillmentRequestMessage>, and it will be ISubscriber<BilledOrderMessage> and ISubscriber<AllocatedOrderMessage>.  This is why Explicitly Implemented Interfaces exist, after all.

Then, in general, an Order will come in, and the OrderSystem will publish the requisite NewOrderMessage.  Billing and Inventory will be listening for that message, and will each do their job based on the information therein.  When done, Billing (IPublisher<BilledOrderMessage>) will publish it's message saying "yay" or "nay," and Inventory will publish (IPublisher<AllocatedOrderMessage>) it's message saying "yep, we've got enough" or "nope, we'll have to back-order."  The OrderSystem will then wait until it has both an AllocatedOrderMessage and a BilledOrderMessage for the order, before requesting Fulfillment ("Hey, billing and inventory say this is a go- somebody call FedEx!").

12 April 2013

The Lowly Do/While

So, semi-officially, I'm exploring some options regarding a Pub/Sub Architecture at work.

One of the requirements (for this to be worth continuing exploring instead of just grabbing something off-the-shelf) is that we want to remove the man-in-the-middle that so many other frameworks use.

Before I go on, let me explain that:
Most pub/sub architectures, buses, or solutions are actually broker solutions.  Whether we're talking BizTalk (which does, indeed, use pub/sub internally), MassTransit (which looks like an awesome product, though I've never used it), or whatever, there is something in the middle that is maintaining the subscriptions.  A subscriber does not request messages, messages are pushed to the subscriber.

We want to explore the possibility of getting away from that paradigm.  We want the publisher simply to be able to lay their message wherever, and the subscribers to say "Hey, I wonder if I have mail?" and go check.

With that out of the way, what I'm currently exploring is the ability to lay messages on a DB (I'm using RavenDB for development because a) it's easy and b) It's what I'd like to use), and then have the subscribers be responsible for picking up those messages.

That's an interesting problem right there.  This is especially true with Raven's limit of 128 documents per request, but even with SQL or Oracle, you run into issues of "how often do I poll, and how do I know when to stop?"  Pulling back "all" the messages is no big deal when you have, say, 1000 or less.  It becomes a much bigger deal once you start approaching the 100,000+ range.

I started with the standard "while" loop.  You know; you've used it more times than you can count.  And you're already thinking (many of you): "Sure.  You'll go fetch records, check to see if you need more, and then use a while loop to finish fetching your documents."

You're even thinking of something along these lines:

int count = 0;
int cursor = 0;
List<Document> documents = new List<Document>
using(var session = DocStore.OpenSession())
{
  var results = session.Query<Document>();
  if(results.Count() > 0) documents.AddRange(results);
  count = results.Count();
  while(count > 0)
  {
    cursor = documents.Count;
    results = session.Query<Document>()
              .Skip(cursor)
          .Take(128);
    count = results.Count();
    if(count == 0) break;
    documents.AddRange(results);
  }
}

Many of you are looking at that and (except for how the paging is being handled -hey, it's an example, not production code-) see nothing particularly wrong.

And that's were I started.  But it looked messy to me.  It was hard to say "Here's what I'm doing."  So, I considered, and then I remembered something from long ago.  It was a construct that I hadn't used since my computer classes.  In fact, if you look around the Net, you'll see some people calling it the worst of the loop constructs.

I'm referring to the lowly Do/While loop.

Where a For loop says "Hey, do this X times," and a While loop says "Hey, as long as this is true, keep doing this," Do/While says "Here, do this.  Then if this condition is true, keep doing it."  That is, the action inside the loop will always run at least once.

When fetching records from a database that may or may not exist, it is incredibly helpful.

Here's the cleaner (IMO) code:

int count = 0;
int cursor = 0;
int page_size = 100; //I like 100 docs at a time.  Use as many as you want up to 128
List<Document> documents = new List<Document>();
using(var session = DocStore.OpenSession())
{
  do
  {
    var results = session.Query<Document>()
          .Skip(cursor)
          .Take(page_size);
    count = results.Count();
    if(count == 0) break;
    documents.AddRange(results);
  }
  while(count > 0);
}

I added that "page_size" variable, just for clarity.  It's better to state explicitly "We'll be returning up to 100 records at a time" than just leave it up to the RavenDB configuration.  Easier for someone later to read and know what's happening.

See how much easier it flows?  The original, if read for understanding would say:
"First, we go ahead and initialize some variables, including a list to store our results and two ints... I guess I'll see what those do.  Okay, then we go fetch the records from the database (Oh, yeah, this is RavenDB, so it's probably 128).  Then we check to see how many results we got back.  If we have any, we'll add those to the list, and then go look for more.  Oh, that's what we're using those two ints for, one is how many records we got back this time, the other is how many records we have total, so we know how many to skip.  Got it.  We'll keep doing that until a fetch doesn't grab any."

The second, reads this way:
"Okay, we initialize some variables: a list to store our results, and three ints... I guess I'll see what those do.  Then, we go fetch results- oh, there's the first int: cursor must be the total records we have grabbed and page size means how many records I'm bringing back.  Then count is how many records were brought back.  If there weren't any, we stop.  Otherwise we add the current result set to our list, and keep going."

Both things do functionally the same operation, the only difference is in readability.

Over on StackOverflow, the question about the difference between Do/While and While has been asked a couple of times, and the consensus seems to be "I've never needed it" or "I've seldom needed it."  But how often do we do exactly this functionality- "Hey, do this until it's done." 

If you need to make sure it's done at least once, then Do/While is your man.

07 December 2011

25 November 2011

Kindle,Kindle woo-hoo!

So, I got my Kindle Fire Wednesday afternoon.  I've had a little chance to play with it.

A full "Un-boxing and review" post will come later, but here are some quick impressions.

For what I plan to do with it, it seems fine.  Browsing the internet, downloading and using apps, watching videos: all of these things are great (so far).  My only problem is that I had been planning to post this from the Kindle.  That didn't work.

Everywhere else I've been trying to post has been fine, so it may just be the Blogger interface, but I was hoping that short posts would be without too much pain.  That seems not to be the case at the moment.  Maybe I'll figure it out after playing with it some more, but that is my only pain point, it seems.

All in all, I give it a solid 3.5 / 5 on a preliminary basis.