Thursday 17 September 2009

c# WCF Peer to Peer .net solution

Well I finally decided to actually write my peer to peer resolver solution (for all those interested in the straight peer-to-peer solution that will follow). The solution is fairly simple, as the framework does most of the work for us.

The solution is an adaptations of David Whitney’s solution link.I’ll comment directly on David’s site on why not to use his design.

So where do we start? The solution uses a WCF peer resolver service that manages the initial client connection, so that seems the most logical place..

As with all important elements of this solution its best to use a thread safe singleton pattern to control your resolver instance (don’t want uncontrolled duplicate instances out there now do we). (For more details on implementation of the singleton pattern see this link).

Create a new Class Library project, and add a public sealed class named “PersistantResolver”

Before I go any further I generally break up my code into logical regions (as many of you do), so first off the private members.

#region members

//Used for locking the singlton
private static readonly object _syncRoot = new object();
private static PersistentResolver _instance;

//Framework resolver
private readonly CustomPeerResolverService _resolver;
private readonly ServiceHost _serviceHost;
private static BackgroundWorker _worker;

#endregion

Nice and simple so far ,some people choose to implement the CustomPeerResolverService, but in all honesty, you want this to be as light as possible, not to mention re-inventing the wheel.

#region instance

public static PersistentResolver Instance
{
get
{
if (_instance == null)
{
lock (_syncRoot)
{
_instance = new PersistentResolver();
}
}
return _instance;
}
}

//Private constructor called by first instance
private PersistentResolver()
{
try
{
//Initialise the resolver and the service host
_resolver = new CustomPeerResolverService { ControlShape = false };
_serviceHost = new ServiceHost(_resolver);
}
catch (Exception ex)
{
//handle your exception however you want...
throw;
}
}

#endregion

The next part is the meat of the resolver (what little there is). This includes a heartbeat execution.
The heartbeat is what notifies the connected clients that the resolver is still there (part of the "what happens if my resolver needs to be restarted" solution), the resolver sends out a heartbeat every so often, on failure to receive an expected heartbeat the clients can take appropriate action, but I'll cover that more later.

The "Listen" and "StopListening" methods control the running of the resolver service, the additional "JoinNetwork", "StartHeartbeat" and "ExecuteSend" methods provide the fail handling.

So the resolver code..

#region methods

public void Listen()
{
//Open the resolver and service host
_resolver.Open();
_serviceHost.Open();
}

public void StopListening()
{
//close the resolver and service host
_resolver.Close();
_serviceHost.Close();
}

public void JoinNetwork()
{
//Join the peer mesh (ie connect to resolver and join p2p network).
//I'll come back to this method call a little later in the PeerHelper class (client class).
PeerHelper.Instance.JoinNetwork(new EmptyMessageProcessor());
}

public void StartHearbeat()
{
//This starts a background worker callin the ExecuteSend method
_worker = new BackgroundWorker();
_worker.DoWork += ExecuteSend;
_worker.RunWorkerAsync();
}

private void ExecuteSend(object sender, DoWorkEventArgs e)
{
try
{
//Call the SendHeartBeat method on the PeerHelper every 5 seconds.
while (true)
{
PeerHelper.Instance.SendHeartBeat();
Thread.Sleep(5000);
}
}
catch (Exception ex)
{
SnitchInformer.LogException(ex, ErrorSeverity.Fatal);
throw;
}
}

#endregion

You have finished your resolver core code, all that's left is to call from the service, so for starting a service (or console host etc) just call as follows.

PersistentResolver.Instance.Listen();
PersistentResolver.Instance.JoinNetwork();
PersistentResolver.Instance.StartHearbeat();

To close the service just call.

PersistentResolver.Instance.StopListening();

You'll need to declare the service entry with address and endpoint in the servicemodel section of the app.config...

...


You'll also need an endpoint to be able to connect (in client of servicemodel).



That's it, everything else is fairly standard (google wcf services for more help or email me if your struggling).



So now we're on to the interesting bit, client code, after all its what all this work is for, so I'll just try to explain the client code a little before i go any further...

The clients will utilise a wcf contract implementation IPeerMessageProcessor to handle messages, to put it simply, whenever a contract method is executed on the peer mesh, all clients will execute their instance of the method. Lets say we have a message called "HelloWorld" that simply spits out the text "Hello World", each client will have an instance of the wcf contract (containing a "HelloWorld" method), and if one client calls the method, all clients will call their own instance, thus spit out the "hello world" message on all clients.

Its a very elegant system, and I've tried to implement it in a stable way that allows easy shared/custom implementations across the clients. So to start lets create our wcf contract...

using System.ServiceModel;

[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", CallbackContract = typeof(IPeerMessageProcessor))]
public interface IPeerMessageProcessor
{
///
/// Process a PeerMessage object
///

/// [OperationContract(IsOneWay = true)]
void ProcessMessage(PeerMessage message);

///
/// Process a heartbeat
///

[OperationContract(IsOneWay = true)]
void ProcessHeartbeat();
}

The PeerMessage object is just an object, and can be anything you want (eg), and fully supports complex types (be careful how much you pass though, there are limits).

public class PeerMessage
{
public string MessageText {get;set;}
public DateTime MessageSent {get;set;}
}

Of course we also need a default implementation of the contract, and this will be our base message processor, all custom processors will inherit from this object (thus allowing you to pick and choose default functionality).

///
/// Default message processor - all processors should inherit this
///

public class MessageProcessor : IPeerMessageProcessor
{
///
/// Default message processor behaviour
///

/// public virtual void ProcessMessage(PeerMessage message)
{
}

///
/// Default message processor heartbeat behaviour
///

public virtual void ProcessHeartbeat()
{
//handles the heartbeat receipt from the resolver
PersistentHeartbeatMonitor.Instance.ActiveProcessor = this;
PersistentHeartbeatMonitor.Instance.LastRecievedHeartbeat = DateTime.Now;
}
}

We need an empty inheritor of this for the resolver service (see the PersistentResolver class earlier).

public class EmptyMessageProcessor : MessageProcessor
{
public override void ProcessMessage(MessageEntites.PeerMessage message)
{
}

public override void ProcessHeartbeat()
{
}
}

Using this inherited object as your message processor overrides the deault behaviour to do nothing.

Back to the MessageProcessor, you no doubt spotted the PersistentHearbeatMonitor in the ProcessHeartbeat method, so I'll go through that very quickly...

The heartbeatmonitor is another singleton object that handles monitoring the mesh resolver, and handles unexpected disconnection. I have seen some very poor alternative methods, (I'll discuss in a minute)
Very simply, this class once initialised will check if a heartbeat has been received within the last 20 seconds, if not it attempts to reconnect to the peer mesh, and continues to do so until successful. I have tested this and it works a treat but feel free to do what you want with it.
i
simple set of members for the instance, worker and the current processor, and the instance...

private static readonly object _syncRoot = new object();
private static PersistentHeartbeatMonitor _instance;
private static BackgroundWorker _worker;
private static MessageProcessor _currentProcessor;

public static PersistentHeartbeatMonitor Instance
{
get
{
if (_instance == null)
{
lock (_syncRoot)
{
_instance = new PersistentHeartbeatMonitor();
}
}
return _instance;
}
}

private PersistentHeartbeatMonitor()
{
//Set off the hearbeat monitor on a backgroundworker thread
_worker = new BackgroundWorker();
_worker.DoWork += ExecuteHeartbeatWorker;
_worker.RunWorkerAsync();
}

The only thing slightly different here is the constructor, its attaching a method to the worker and executing.

public void ExecuteHeartbeatWorker(object sender, DoWorkEventArgs e)
{
//Leave the heartbeat monitor in a constant loop
while(true)
{
//Check if we have had a heartbeat recently.
if (!IsLastHeartbeatWithinAcceptableLimits)
{
//If not try to reconnect
TryToRejoinPeerMesh();
}
//wait 5 seconds before we check again.
Thread.Sleep(5000);
}
}

public bool IsLastHeartbeatWithinAcceptableLimits
{
get
{
//have we received a heartbeat in the last 20 seconds
return (LastRecievedHeartbeat.AddSeconds(20) > DateTime.Now);
}
}

public void TryToRejoinPeerMesh()
{
//Retry logic is in a never ending loop.
while (true)
{
try
{
//Try to join the peer mesh resolver
PeerHelper.Instance.JoinNetwork(ActiveProcessor);
//If it gets this far connection was succssful, therefore exit the loop.
break;
}
catch
{
//JoinNetwork failed, abort the network (so channel available to try again
//Sleep the tread for 5 seconds, so can try again.
PeerHelper.Instance.AbortNetwork();
Thread.Sleep(5000);
}
}
}

The above is fairly simple, every 5 seconds check if we have received a heartbeat, if not start trying to reconnect every 5 seconds until successful.
The last part is the properties used by the MessageProcessor ProcessHeartbeat method.

public DateTime LastRecievedHeartbeat { get; set; }

public MessageProcessor ActiveProcessor
{
get { return _currentProcessor ?? new MessageProcessor(); }
set { _currentProcessor = value; }
}


OK I'll get on to some actual code we can use by clients. To start with we'll have a PeerConnection object to control the client connection and processing of messages. The class will contain the following members and constructor

//Required by the wcf channel
private DuplexChannelFactory _factory;
private InstanceContext _context;
private IMessageChannel _participant;
private readonly string _endPointName;
//Our specific instance of the IPeerMessageProcessor contract
private MessageProcessor _processor;

//Pass the processor instance and an endpoint name in the constructor (i hold the endpoint name in the config)
internal PeerConnection(MessageProcessor processor, string endPointName)
{
_endPointName = endPointName;
_processor = processor;
CreateChannel();
}

The IMessageChannel interface is a simple interface inheriting both the contract and IClientChannel:

public interface IMessageChannel : IPeerMessageProcessor, IClientChannel
{
}

The Constructor calls CreateChannel() as below, this creates the wcf channel requirements....

private void CreateChannel()
{
_context = new InstanceContext(_processor);
_factory = new DuplexChannelFactory(_context, _endPointName);
_participant = _factory.CreateChannel();
}

//Connect to resolver and join the mesh
internal void JoinNetwork()
{
//Check channel exists, if not create
if (_participant == null)
{
CreateChannel();
}
if (_participant == null)
{
throw new Exception("Could not create peer mesh channel.");
}
if (_participant.State == CommunicationState.Opened || _participant.State == CommunicationState.Opening)
return;

//Open the channel
_participant.Open();
//just in case wait for channel state to be opened
while (_participant.State != CommunicationState.Opened)
{
Thread.Sleep(15);
}
}

//Abort channel
internal void AbortNetwork()
{
//abort (should something go wrong we always need an abort
if (_participant != null)
{
_participant.Abort();
_participant = null;
}
}

//Close the channel (leave the mesh)
internal void LeaveNetwork()
{
if (_participant != null)
{
_participant.Close();
}
_participant = null;
}

//Send a heartbeat message across the mesh
internal void SendHeartBeat(object context)
{
//As discussed this triggers the ProcessHeartbeat method in each
//clients instance of the MessageProcessor
_participant.ProcessHeartbeat();
}

//Send a message across the mesh
internal void SendMessage(PeerMessage message)
{
//As discussed this triggers the ProcessMessage method in each
//clients instance of the MessageProcessor
_participant.ProcessMessage(message);
}

Ok so final stage, we need to be able to control the peer mesh instance, so yep we're going to use a singlton again, PeerHelper, hers its members

private static readonly object _syncRoot = new object();
private static PeerHelper _instance;
//The specific processor contract instance
private static MessageProcessor _processor;
//The connection instance
private static PeerConnection _connection;

This singleton uses a standard pattern instance property, and an empty constructor, so I'll let you figure that one out for yourselves..
To start we need to be able to join the network.

///
/// Joins the peer to peer network
/// Uses the default message processor
///

public void JoinNetwork()
{
JoinNetwork(new MessageProcessor());
}

///
/// Join the peer to peer network
/// Uses a custom message processor implementation
///

/// custom message processorpublic void JoinNetwork(MessageProcessor messageProcessor)
{
_processor = messageProcessor;
ValidateConnection();
_connection.JoinNetwork();
}

And the ValidateConnection method listed above (this just checks we havea valid conneciton, and joins if need be).

private static void ValidateConnection()
{
//check if connection instance exists and has value
if (_connection == null)
{
//check if processor instance exists
if (_processor == null)
{
//if no processor use default contract processor
_processor = new MessageProcessor();
}
//if no connection create a new instance
_connection = new PeerConnection(_processor, ConfigurationManager.AppSettings["PeerMeshEndpointName"]);
}
}

A simple leave and abort methods.

///
/// Leave the peer to peer network
///

public void LeaveNetwork()
{
if (_connection != null)
{
_connection.LeaveNetwork();
}
_connection = null;
}

///
/// Abort the peer to peer network
///

public void AbortNetwork()
{
if (_connection != null)
{
_connection.AbortNetwork();
}
_connection = null;
}

And finally the ones we've been waiting for, The send message and heartbeat.

///
/// Send a peer message object across the peer network
///

/// public void SendMessage(PeerMessage message)
{
ValidateConnection();
_connection.SendMessage(message);
}

///
/// Send a heartbeat across the peer mesh
///

public void SendHeartBeat()
{
ValidateConnection();
_connection.SendHeartBeat(this);
}

These are small methods, all actual logic is held elsewhere, the point of the PeerHelper class, is to 1 provide a single instance, thus protect from any multi threading nastyness, and 2 provide a clean interface for your clients to use.

Joining the mesh is now as simple as:
PeerHelper.Instance.JoinNetwork();

and sending a message:
//Remember to define your own PeerMessage object with constructor etc of your choice.
PeerHelper.Instance.SendMessage(new PeerMessage("This is a test client message"));

the following is a sample test console application...
namespace smp.TestPeerMeshSolution
{
class Program
{
static void Main(string[] args)
{
PeerHelper.Instance.JoinNetwork(new TestMessageProcessor());
while(true)
{
//send a message every 2 seconds.
PeerHelper.Instance.SendMessage(new PeerMessage("This is a test client message"));
Thread.Sleep(2000);
}
}
}

public class TestMessageProcessor : MessageProcessor
{
public override void ProcessMessage(PeerMessage message)
{
Console.WriteLine(message.EventMessage);
base.ProcessMessage(message);
}

public override void ProcessHeartbeat()
{
Console.WriteLine("Heartbeat received");
base.ProcessHeartbeat();
}
}
}

Notice how you only see a message on screen if you have more than one console instance open. Also try shutting down the service and watch what happens, pad out some methods with Console.Writeline("...") descriptors to see in more detail.

I hope this is of some help to anyone that reads it, and as alwasy, these are just my thoughts, i have performed some extensive testing of the core solution
(Standard garb... i can offer no guarantees, and take no responsibility for use of anything duscussed here...)

7 comments:

  1. Hi. I've just read your article and I have a couple questions about it. Can I email you to ask some questions? I couldn't find your email anywhere. Here's a temporary email address you can reach me at: lerkhern1234@mailinator.com

    Thanks!

    ReplyDelete
  2. Apologies for delayed response, give me a shout at simonpotter.dev@gmail if you still need help.

    ReplyDelete
  3. Great post! I've been looking for a framework that would leverage WCF oeer processing to scale out a process across multiple machines in an enterprise. This looks like a great starting point.

    Basically, the idea would be to use the peer2peer stuff for peer tracking (who's available, who's busy, that sort of thing), and then use a direct nettcp connection when actual processing work needed to be farmed out (ie wouldn't want to swamp all the nodes with a job that only the dest node needs to see).

    Anyway, good stuff. I'm looking forward to playing more with WCF.

    ReplyDelete
  4. Hi great Post! Could you share the VisualStudio-Solution with us?

    ReplyDelete
  5. Will do, i need to did it out, i have
    two proptype solutions, one is ipv4 based (server 03) and requires a resolver service, the other is pure p2p and relies on ipv6 protocols. works fine is all windows since (and including xp) providiing appropriate modules are installed, and ok with server 08.

    ReplyDelete
  6. This is awesome. Is the Visual Studio version posted anywhere?

    ReplyDelete
  7. Latest post mentions this, but the source code can be found at
    http://svn2.xp-dev.com/svn/smpPeerMesh/trunk/PeerResolver1.0/
    Using any standard svn client to get it, again i take no responsibility (it does work though to be fair) for its use or code based on its practices...

    ReplyDelete