Token Based Asynchronous Socket Server On .NET 4

by max 19. March 2012 08:12

Even though there are exist many technologies and approaches for network communication, such as Windows Communication Foundation or Web Servers with HTTP POST/GET requests, some tasks still require using of Network TCP Sockets. I’ve got a chance to work with sockets a lot recently and one of my tasks was intermediate socket service which  provides communication between automatic robot and client software. First and naïve solution was the following:

  1. Client send command
  2. Socket server get command and start executing it
  3. Once command execution is  done, send response to client

There is nothing wrong with this solution except of just one thing: the time between steps 2 and 3 could be really big, up to several minutes. Socket server was written on .Net 4, client was written on Ruby 1.9. For unknown reasons (after some internet research it happened that this is Ruby bug) when waiting time exceeds ~1 minute, client never gets server response, even though server clearly has sent it.

Here I’ll show one of possible solutions which eliminates this problem and adds some flexibility to solution in general. We will use token based server, which means that server and client will use token exchange as a handshake to confirm that all of the messages were correctly delivered.

The basis for Server Socket could be found on the following MSDN articles:

We will use that and will build up more functionality.

Our scenario will be the following:

  1. Client initiates connection by sending the command with token for that command
  2. Server gets the command, stores token for that command in local token cache and sends response immediately
  3. On a separate thread server executes the command
  4. Client sends requests with “check status” command and token
  5. If command has not finished yet, server replies that this token is currently “busy”
  6. Once command is finished, server replies “finished” for “check status” request
  7. Client should confirm that command is finished, server cleans up token cache after confirmation

image

The code that I’m providing here is just an example, prototype, and has some restrictions:

  1. Only one command could be set up in token cache
  2. If client will send another command without prior confirmation for previous command, it will be ignored by server and previous token status will be sent as a response
  3. It will support dummy commands, such as “say hi” or “say bye”, but it is sufficient for the demo purposes

We will use JSON as an exchange protocol and an awesome JSON library for .Net: http://json.codeplex.com/

Server code is written in C++/CLI and using CLR classes. Easily portable to C# (just a matter of syntax in this case)

I. Define the command:

Commands.h
  1. /// <summary>
  2. /// Enumeration with supported commands
  3. /// </summary>
  4. public enum class CommandsList
  5. {
  6.     NOT_INITIALIZED = 0,
  7.     INIT,
  8.     SAY_HI,
  9.     SAY_BYE,
  10.     CHECK_STATUS,
  11.     CONFIRM,
  12.  
  13.     UNSUPPORTED = -1
  14. };
  15.  
  16. /// <summary>
  17. /// Serializable to JSON command with token
  18. /// </summary>
  19. public ref class Command
  20. {
  21. public:
  22.  
  23.     Command();
  24.  
  25.     #pragma region Utils
  26.  
  27.     /// <summary>
  28.     /// Converts string to commands enum
  29.     /// </summary>
  30.     static CommandsList StringToCommand(String ^command);
  31.     /// <summary>
  32.     /// Checks if command is valid to be processed
  33.     /// </summary>
  34.     static bool IsValid(CommandsList cmd);
  35.     
  36. #pragma endregion
  37.  
  38.     #pragma region Public Properties
  39.  
  40.     /// <summary>
  41.     /// Command name
  42.     /// </summary>
  43.     [JsonProperty]
  44.     property String ^command;
  45.     /// <summary>
  46.     /// Token assosiated with the command
  47.     /// </summary>
  48.     [JsonProperty]
  49.     property String ^token;
  50.  
  51.     #pragma endregion
  52.  
  53. private:
  54.  
  55.     #pragma region Private Members
  56.  
  57.     /// <summary>
  58.     /// Stores assosiations string - commands enum
  59.     /// </summary>
  60.     static Hashtable ^commandsMap;
  61.  
  62.     #pragma endregion
  63. };

 

Commands.cpp
  1. /// Fill up the commands hashtable with name - enum assosiations
  2. Command::Command()
  3. {
  4.     #pragma region Command name to enum mapping
  5.     
  6.     commandsMap = gcnew Hashtable();    
  7.     commandsMap["init"] = CommandsList::INIT;
  8.     commandsMap["say_hi"] = CommandsList::SAY_HI;
  9.     commandsMap["say_bye"] = CommandsList::SAY_BYE;
  10.     commandsMap["check_status"] = CommandsList::CHECK_STATUS;
  11.     commandsMap["confirm"] = CommandsList::CONFIRM;
  12.  
  13.     #pragma endregion
  14. }
  15.  
  16. /// If command name was not found in the hashtable, return unsupported
  17. CommandsList Command::StringToCommand(String ^command)
  18. {
  19.     if (commandsMap->ContainsKey(command))
  20.     {
  21.         return safe_cast<CommandsList>(commandsMap[command]);
  22.     }
  23.     else
  24.     {
  25.         return CommandsList::UNSUPPORTED;
  26.     }
  27. }
  28.  
  29. /// Command not valid if it is unsupported on not initialized
  30. bool Command::IsValid(CommandsList cmd)
  31. {
  32.     if ((CommandsList::NOT_INITIALIZED != cmd) &&
  33.         (CommandsList::UNSUPPORTED) != cmd)
  34.     {
  35.         return true;
  36.     }
  37.     return false;
  38. }

II. Define the commands processor:

CommandsProcessor.h
  1. /// <summary>
  2. /// Class responsible for every command process
  3. /// </summary>
  4. public ref class CommandsProcessor
  5. {
  6. public:
  7.  
  8.     CommandsProcessor();
  9.  
  10.     /// <summary>
  11.     /// Chooses which command to process
  12.     /// </summary>
  13.     /// <param name="command">Command object</param>
  14.     /// <param name="tokenCache">Token cache object</param>
  15.     void ProcessCommand(Command ^command, TokenCache^ tokenCache);
  16.  
  17. private:
  18.     
  19.     #pragma region Every Command Processor
  20.  
  21.     /// Individual processors for every command
  22.  
  23.     bool _isInitialized;
  24.     void ProcessInit(Command ^command, TokenCache^ tokenCache);
  25.  
  26.     void ProcessHi(Command ^command, TokenCache^ tokenCache);
  27.     void ProcessBye(Command ^command, TokenCache^ tokenCache);
  28.     
  29.     #pragma endregion
  30.  
  31.     #pragma region Exceptions Handlers
  32.  
  33.     /// Handlers for exceptions to store and to include in service reply
  34.  
  35.     void HandleManagedException(Exception^ exception);
  36.     void EmptyExceptionMessage();
  37.  
  38.     ServiceException^ _exception;
  39.  
  40.     #pragma endregion
  41. };

 

CommandsProcessor.cpp
  1. /// Constructor
  2. CommandsProcessor::CommandsProcessor()
  3. {
  4.     _isInitialized = false;
  5.     _exception = gcnew ServiceException();
  6. }
  7.  
  8. /// Process command, emulate processing for this demo
  9. void CommandsProcessor::ProcessCommand(Command ^command, TokenCache ^tokenCache)
  10. {
  11.     //TODO: EMULATION ONLY
  12.     Console::WriteLine("--SLEEP--\n");
  13.     Thread::Sleep(10000);
  14.     Console::WriteLine("--WAKE UP--\n");
  15.     /////////////////
  16.  
  17.     /// Parse command and process it
  18.     CommandsList cmd = command->StringToCommand(command->command);
  19.     switch (cmd)
  20.     {
  21.     case CommandsList::INIT:
  22.         {
  23.             ProcessInit(command, tokenCache);
  24.             break;
  25.         }
  26.     case CommandsList::SAY_HI:
  27.         {
  28.             ProcessHi(command, tokenCache);
  29.             break;
  30.         }
  31.     case CommandsList::SAY_BYE:
  32.         {
  33.             ProcessBye(command, tokenCache);
  34.             break;
  35.         }
  36.     case CommandsList::CHECK_STATUS:
  37.     case CommandsList::CONFIRM:
  38.     default:
  39.         {
  40.             break;
  41.         }
  42.     }
  43.  
  44.     /// We already caught all exceptions in every command processor
  45.     EmptyExceptionMessage();
  46. }
  47.  
  48. #pragma region Every Command Processor
  49.  
  50.     #pragma region Init
  51. void CommandsProcessor::ProcessInit(Command ^command, TokenCache ^tokenCache)
  52. {
  53.     try
  54.     {
  55.         /// TODO: Insert specific processing here
  56.         _isInitialized = true;
  57.     }
  58.     catch (Exception ^exception)
  59.     {
  60.         /// Store exception in order to include it in report
  61.         HandleManagedException(exception);
  62.     }
  63.  
  64.     /// Form JSON formatted reply
  65.     JObject ^o = gcnew JObject(
  66.         gcnew JProperty("command", command->command),
  67.         gcnew JProperty("exception_message", _exception->_exceptionMessage),
  68.         gcnew JProperty("is_init", _isInitialized));
  69.  
  70.     /// Set status and report in the token cache
  71.     tokenCache->SetTokenResponse(command, o);
  72.     tokenCache->SetTokenStatus(command,
  73.                                (_exception->_isException) ?
  74.                                    TOKEN_ABORTED : TOKEN_SUCCESS,
  75.                                true);
  76.     tokenCache->WriteTokenFile(tokenCache->GetToken(command));
  77.  
  78. }
  79. #pragma endregion
  80.  
  81.     #pragma region Hi
  82. void CommandsProcessor::ProcessHi(Command ^command, TokenCache ^tokenCache)
  83. {
  84.     try
  85.     {
  86.         /// TODO: Insert specific processing here
  87.     }
  88.     catch (Exception ^exception)
  89.     {
  90.         /// Store exception in order to include it in report
  91.         HandleManagedException(exception);
  92.     }
  93.  
  94.     /// Form JSON formatted reply
  95.     JObject ^o = gcnew JObject(
  96.         gcnew JProperty("command", command->command),
  97.         gcnew JProperty("exception_message", _exception->_exceptionMessage),
  98.         gcnew JProperty("message", "Hi!"));
  99.  
  100.     /// Set status and report in the token cache
  101.     tokenCache->SetTokenResponse(command, o);
  102.     tokenCache->SetTokenStatus(command,
  103.                                (_exception->_isException) ?
  104.                                    TOKEN_ABORTED : TOKEN_SUCCESS,
  105.                                true);
  106.     tokenCache->WriteTokenFile(tokenCache->GetToken(command));
  107.  
  108. }
  109. #pragma endregion
  110.  
  111.     #pragma region Bye
  112. void CommandsProcessor::ProcessBye(Command ^command, TokenCache ^tokenCache)
  113. {
  114.     try
  115.     {
  116.         /// TODO: Insert specific processing here
  117.     }
  118.     catch (Exception ^exception)
  119.     {
  120.         /// Store exception in order to include it in report
  121.         HandleManagedException(exception);
  122.     }
  123.  
  124.     /// Form JSON formatted reply
  125.     JObject ^o = gcnew JObject(
  126.         gcnew JProperty("command", command->command),
  127.         gcnew JProperty("exception_message", _exception->_exceptionMessage),
  128.         gcnew JProperty("message", "Bye!"));
  129.  
  130.     /// Set status and report in the token cache
  131.     tokenCache->SetTokenResponse(command, o);
  132.     tokenCache->SetTokenStatus(command,
  133.                                (_exception->_isException) ?
  134.                                    TOKEN_ABORTED : TOKEN_SUCCESS,
  135.                                true);
  136.     tokenCache->WriteTokenFile(tokenCache->GetToken(command));
  137.  
  138. }
  139. #pragma endregion
  140.  
  141. #pragma endregion
  142.  
  143. #pragma region Exceptions Handlers
  144.  
  145. /// Store exception in order to include it in report
  146. void CommandsProcessor::HandleManagedException(Exception^ exception)
  147. {
  148.     _exception->_isException = true;
  149.     _exception->_exceptionMessage = gcnew String(exception->Message);
  150. }
  151.  
  152. /// Clean up stored exception
  153. void CommandsProcessor::EmptyExceptionMessage()
  154. {
  155.     _exception->_isException = false;
  156.     _exception->_exceptionMessage = String::Empty;
  157. }
  158.  
  159. #pragma endregion

Here we use helper class to store exceptions:

Exceptions.h
  1. /// <summary>
  2. /// Exceptions helper: stores exception to include in reply
  3. /// </summary>
  4. public ref class ServiceException
  5. {
  6. public:
  7.     ServiceException() :
  8.         _isException(false),
  9.         _exceptionMessage(nullptr)
  10.     { }
  11.  
  12.     bool _isException;
  13.     String^ _exceptionMessage;
  14. };

III. Define the local Token Cache for storing current command’s status and info.

This class stores token status in memory and on file system for emergency cases (power off computer without prior confirmation for token for example).

TokenCache.h
  1. /// <summary>
  2. /// Enumeration with token statuses
  3. /// </summary>
  4. public enum TokenStatuses
  5. {
  6.     TOKEN_BUSY = 0,
  7.     TOKEN_SUCCESS,
  8.     TOKEN_ABORTED,
  9.     TOKEN_BAD_COMMAND = -1
  10. };
  11.  
  12. /// <summary>
  13. /// Helper class: converts token status from enum to string
  14. /// </summary>
  15. public ref class TokenConverter
  16. {
  17. public:
  18.     static String^ TokenStatusToString(TokenStatuses status);
  19. };
  20.  
  21. /// <summary>
  22. /// Serializable to JSON token status
  23. /// </summary>
  24. public ref class TokenStatus
  25. {
  26. public:
  27.  
  28.     #pragma region Public Properties
  29.  
  30.     /// <summary>
  31.     /// Token assosiated with the command
  32.     /// </summary>
  33.     [JsonProperty]
  34.     property String ^token;
  35.     /// <summary>
  36.     /// Token status
  37.     /// </summary>
  38.     [JsonProperty]
  39.     property String ^status;
  40.     /// <summary>
  41.     /// Is command processed?
  42.     /// </summary>
  43.     [JsonProperty]
  44.     property bool finished;
  45.     /// <summary>
  46.     /// JSON response for current command
  47.     /// </summary>
  48.     [JsonProperty]
  49.     property JObject ^response;
  50.  
  51.     #pragma endregion
  52.  
  53.     TokenStatus(String ^_token);
  54. };
  55.  
  56. /// <summary>
  57. /// Token Cache class responsible for Set/Get/Remove current token
  58. /// Limitations of this version: only one token could be set up
  59. /// But easily extendable to unlimited nuber of tokens
  60. /// </summary>
  61. public ref class TokenCache
  62. {
  63. public:
  64.  
  65.     #pragma region Token Operations
  66.     
  67.     /// Token operations: Set/Get/Remove
  68.  
  69.     static TokenStatus ^GetToken(Command^ cmd);
  70.     static bool SetToken(Command^ cmd, TokenStatus^ token);
  71.     static bool SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished);
  72.     static bool SetTokenResponse(Command^ cmd, JObject ^response);
  73.     static bool RemoveToken(Command^ cmd);
  74.  
  75.     /// Store last operation in a token file for emergency cases
  76.  
  77.     static bool CheckTokenFile([Out] TokenStatus ^%token);
  78.     static void WriteTokenFile(TokenStatus ^token);
  79.     static void DeleteTokenFile();
  80.  
  81.     #pragma endregion
  82.  
  83. private:
  84.  
  85.     #pragma region Private Members
  86.  
  87.     /// <summary>
  88.     /// Stores assosiations token - status
  89.     /// </summary>
  90.     static Dictionary<String^, TokenStatus^> ^_tokens
  91.         = gcnew Dictionary <String^, TokenStatus^>();
  92.  
  93.     /// If new token came, check for previous token
  94.     static String^ prevToken = String::Empty;
  95.  
  96.     #pragma endregion
  97. };

 

TokenCache.cpp
  1. /// Token status constructor, token is busy by default
  2. TokenStatus::TokenStatus(String^ _token)
  3. {
  4.     token = _token;
  5.     status = TokenConverter::TokenStatusToString(TOKEN_BUSY);
  6.     finished = false;
  7.     response = nullptr;
  8. }
  9.  
  10. /// Gets token
  11. /// Locks tokens dictionary
  12. /// If previous token is not empty, use it and ignore new token
  13. TokenStatus^ TokenCache::GetToken(Command^ cmd)
  14. {
  15.     TokenStatus ^t = nullptr;
  16.  
  17.     System::Threading::Monitor::Enter(_tokens);
  18.     try
  19.     {
  20.         if (String::Empty == prevToken)
  21.         {
  22.             _tokens->TryGetValue(cmd->token, t);
  23.         }
  24.         else
  25.         {
  26.             _tokens->TryGetValue(prevToken, t);
  27.         }
  28.     }
  29.     finally
  30.     {
  31.         System::Threading::Monitor::Exit(_tokens);
  32.     }
  33.  
  34.     return t;
  35. }
  36.  
  37. /// Removes token
  38. /// Locks tokens dictionary
  39. /// Removes current token from cache if it finished
  40. bool TokenCache::RemoveToken(Command^ cmd)
  41. {
  42.     bool res = false;
  43.     TokenStatus ^t = nullptr;
  44.  
  45.     System::Threading::Monitor::Enter(_tokens);
  46.     try
  47.     {
  48.         if (String::Empty == prevToken)
  49.         {
  50.             if (_tokens->TryGetValue(cmd->token, t))
  51.             {
  52.                 if (t->finished)
  53.                 {
  54.                     _tokens->Remove(cmd->token);
  55.                     res = true;
  56.                 }
  57.             }
  58.         }
  59.         else
  60.         {
  61.             if (_tokens->TryGetValue(prevToken, t))
  62.             {
  63.                 if (t->finished)
  64.                 {
  65.                     _tokens->Remove(prevToken);
  66.                     prevToken = String::Empty;
  67.                     res = true;
  68.                 }
  69.             }
  70.         }
  71.     }
  72.     finally
  73.     {
  74.         System::Threading::Monitor::Exit(_tokens);
  75.     }
  76.  
  77.     return res;
  78. }
  79.  
  80. /// Sets token
  81. /// Locks tokens dictionary
  82. /// If no current tokens in progress, sets new token
  83. bool TokenCache::SetToken(Command^ cmd, TokenStatus^ token)
  84. {
  85.     bool res = false;
  86.  
  87.     System::Threading::Monitor::Enter(_tokens);
  88.     try
  89.     {
  90.         if (String::Empty == prevToken)
  91.         {
  92.             if (!_tokens->ContainsKey(cmd->token))
  93.             {
  94.                 _tokens[cmd->token] = token;
  95.                 prevToken = cmd->token;
  96.                 res = true;
  97.             }
  98.         }
  99.     }
  100.     finally
  101.     {
  102.         System::Threading::Monitor::Exit(_tokens);
  103.     }
  104.  
  105.     return res;
  106. }
  107.  
  108. /// Sets status for current token
  109. /// Locks tokens dictionary
  110. bool TokenCache::SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished)
  111. {
  112.     bool res = false;
  113.  
  114.     System::Threading::Monitor::Enter(_tokens);
  115.     try
  116.     {
  117.         TokenStatus ^t = nullptr;
  118.         if (_tokens->TryGetValue(cmd->token, t))
  119.         {
  120.             t->status = TokenConverter::TokenStatusToString(status);
  121.             t->finished = finished;
  122.             _tokens[cmd->token] = t;
  123.             res = true;
  124.         }
  125.     }
  126.     finally
  127.     {
  128.         System::Threading::Monitor::Exit(_tokens);
  129.     }
  130.  
  131.     return res;
  132. }
  133.  
  134. /// Sets response for current token
  135. /// Locks tokens dictionary
  136. bool TokenCache::SetTokenResponse(Command^ cmd, JObject ^response)
  137. {
  138.     bool res = false;
  139.  
  140.     System::Threading::Monitor::Enter(_tokens);
  141.     try
  142.     {
  143.         TokenStatus ^t = nullptr;
  144.         if (_tokens->TryGetValue(cmd->token, t))
  145.         {
  146.             t->response = response;
  147.             res = true;
  148.         }
  149.     }
  150.     finally
  151.     {
  152.         System::Threading::Monitor::Exit(_tokens);
  153.     }
  154.  
  155.     return res;
  156. }
  157.  
  158. /// Recovers token from isolated file cache
  159. bool TokenCache::CheckTokenFile([Out] TokenStatus ^%token)
  160. {
  161.     bool fileExists = false;
  162.     token = gcnew TokenStatus(String::Empty);
  163.  
  164.     try
  165.     {
  166.         IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
  167.         IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream("token.last", FileMode::Open, isoFile);
  168.  
  169.         String ^tokenData = nullptr;
  170.         try
  171.         {
  172.             StreamReader ^sr = gcnew StreamReader(isoStream);
  173.             try
  174.             {
  175.                 tokenData = sr->ReadToEnd();
  176.             }
  177.             finally
  178.             {
  179.                 delete sr;
  180.             }
  181.             fileExists = true;
  182.         }
  183.         catch (...) { }
  184.         if (!String::IsNullOrEmpty(tokenData))
  185.         {
  186.             try
  187.             {
  188.                 token = JsonConvert::DeserializeObject<TokenStatus^>(tokenData);
  189.             }
  190.             catch (...) { }
  191.         }
  192.         isoFile->Close();
  193.     }
  194.     catch (...) { }
  195.  
  196.     return fileExists;
  197. }
  198.  
  199. /// Backups token to isolated file cache
  200. void TokenCache::WriteTokenFile(TokenStatus ^token)
  201. {
  202.     try
  203.     {
  204.         IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
  205.         IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream("token.last", FileMode::Create, FileAccess::Write, isoFile);
  206.  
  207.         try
  208.         {
  209.             JsonSerializer ^serializer = gcnew JsonSerializer();
  210.             StreamWriter ^sw = gcnew StreamWriter(isoStream);
  211.             try
  212.             {
  213.                 JsonWriter ^writer = gcnew JsonTextWriter(sw);
  214.                 try
  215.                 {
  216.                     serializer->Serialize(writer, token);
  217.                 }
  218.                 finally
  219.                 {
  220.                     delete writer;
  221.                 }
  222.             }
  223.             finally
  224.             {
  225.                 delete sw;
  226.             }
  227.         }
  228.         catch (...) { }
  229.         delete isoFile;
  230.         isoFile->Close();
  231.     }
  232.     catch (...) { }
  233. }
  234.  
  235. /// Cleanups isolated tokens file
  236. void TokenCache::DeleteTokenFile()
  237. {
  238.     try
  239.     {
  240.         IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
  241.         isoFile->DeleteFile("token.last");
  242.         delete isoFile;
  243.         isoFile->Close();
  244.     }
  245.     catch(...) { }
  246.  
  247. }
  248.  
  249. /// Converts token status to string
  250. String^ TokenConverter::TokenStatusToString(TokenStatuses status)
  251. {
  252.     String^ retStr = String::Empty;
  253.  
  254.     switch(status)
  255.     {
  256.     case TOKEN_BUSY:
  257.         retStr = "busy";
  258.         break;
  259.     case TOKEN_SUCCESS:
  260.         retStr = "success";
  261.         break;
  262.     case TOKEN_ABORTED:
  263.         retStr = "aborted";
  264.         break;
  265.     case TOKEN_BAD_COMMAND:
  266.     default:
  267.         retStr = "bad_command";
  268.         break;
  269.     }
  270.  
  271.     return retStr;
  272. }

IV. Finally, define threaded commands processor and Socket Server itself:

SoketServer.h
  1. /// <summary>
  2. /// Threaded object responsible for just one command processing
  3. /// </summary>
  4. public ref class SingleCommandProcessor
  5. {
  6. public:
  7.  
  8.     /// <summary>
  9.     /// Constructor
  10.     /// </summary>
  11.     /// <param name="currCmd">Command to process</param>
  12.     /// <param name="tokenCache">Token cache object</param>
  13.     /// <param name="commandsProcessor">Commands processor object</param>
  14.     SingleCommandProcessor(
  15.         Command^ currCmd,
  16.         TokenCache ^tokenCache,
  17.         CommandsProcessor ^commandsProcessor) :    
  18.             _currCmd(currCmd),
  19.             _tokenCache(tokenCache),
  20.             _commandsProcessor(commandsProcessor)
  21.     {}
  22.  
  23.     /// <summary>
  24.     /// Thread function to process current command
  25.     /// </summary>
  26.     void DoProcess();
  27.  
  28. private:
  29.     Command ^_currCmd;
  30.     TokenCache ^_tokenCache;
  31.     CommandsProcessor ^_commandsProcessor;
  32. };
  33.  
  34. /// <summary>
  35. /// State object for reading client data asynchronously
  36. /// </summary>
  37. public ref class StateObject
  38. {
  39. public:
  40.     StateObject();
  41.  
  42.     Socket ^workSocket;
  43.     literal int BufferSize = 1024;
  44.     array<Byte> ^buffer;
  45.     StringBuilder ^sb;
  46.  
  47. private:
  48.     bool _initialized;
  49.     void InitializeInstanceFields();
  50. };
  51.  
  52. /// <summary>
  53. /// Socket Server exactly as described on MSDN articles:
  54. ///
  55. /// Using an Asynchronous Server Socket:
  56. /// http://msdn.microsoft.com/en-us/library/5w7b7x5f.aspx
  57. ///
  58. /// Asynchronous Server Socket Example
  59. /// http://msdn.microsoft.com/en-us/library/fx6588te.aspx
  60. /// </summary>
  61. public ref class SocketServer
  62. {
  63. public:
  64.     SocketServer(String ^ipAddress, int port);
  65.  
  66.     void Run(String ^ipAddress, int port, int backlog);
  67.     void Stop();
  68.  
  69. private:
  70.     void AcceptCallback(IAsyncResult ^ar);
  71.     void ReceiveCallback(IAsyncResult ^ar);
  72.     void SendCallback(IAsyncResult ^ar);
  73.  
  74.     SocketPermission ^_permission;
  75.     Socket ^_sListener;
  76.     static ManualResetEvent ^allDone = gcnew ManualResetEvent(false);
  77.  
  78.     CommandsProcessor ^_commandsProcessor;
  79.     TokenCache ^_tokenCache;
  80. };

The code for Asynchronous Callbacks is the same here as in MSDN examples (links are above) except of Receive Callback part. The logic for receive callback is the following:

  1. Parse command
  2. If valid command check for tokens
  3. If token not finished => reply with current token
  4. If no current tokens => reply with new token
SocketServer.cpp
  1. /// State object
  2. StateObject::StateObject()
  3. {
  4.     InitializeInstanceFields();
  5. }
  6.  
  7. /// Initialize state object
  8. void StateObject::InitializeInstanceFields()
  9. {
  10.     if ( !_initialized)
  11.     {
  12.         buffer = gcnew array<Byte>(BufferSize);
  13.         sb = gcnew StringBuilder();
  14.  
  15.         _initialized = true;
  16.     }
  17. }
  18.  
  19. /// Thread function for single command processing
  20. void SingleCommandProcessor::DoProcess()
  21. {
  22.     _commandsProcessor->ProcessCommand(_currCmd, _tokenCache);
  23. }
  24.  
  25. /// Socket Server constructor
  26. SocketServer::SocketServer(String ^ipAddress, int port)
  27. {
  28.     _permission = gcnew SocketPermission(PermissionState::Unrestricted);
  29.     _commandsProcessor = gcnew CommandsProcessor();
  30.     _tokenCache = gcnew TokenCache();
  31.  
  32.     _sListener = nullptr;
  33. }
  34.  
  35. /// Run Socket Server
  36. void SocketServer::Run(String ^ipAddress, int port, int backlog)
  37. {
  38.     _permission->Demand();
  39.  
  40.     IPAddress ^ipAddr = IPAddress::Parse(ipAddress);
  41.     IPEndPoint ^ipEndPoint = gcnew IPEndPoint(ipAddr, port);
  42.  
  43.     _sListener = gcnew Socket(ipAddr->AddressFamily, SocketType::Stream, ProtocolType::Tcp);
  44.     _sListener->Bind(ipEndPoint);
  45.     _sListener->Listen(backlog);
  46.  
  47.     while (true) {
  48.         allDone->Reset();
  49.  
  50.         Console::WriteLine("Waiting for a connection on port "+
  51.             Convert::ToString(ipEndPoint->Address) + ":" +
  52.             Convert::ToString(ipEndPoint->Port));
  53.  
  54.         AsyncCallback ^aCallback = gcnew AsyncCallback(this, &SocketServer::AcceptCallback);
  55.         _sListener->BeginAccept(aCallback, _sListener);
  56.  
  57.         allDone->WaitOne();
  58.     }
  59. }
  60.  
  61. /// Stop Socket Server
  62. void SocketServer::Stop()
  63. {
  64.     if (_sListener->Connected)
  65.     {
  66.         _sListener->Shutdown(SocketShutdown::Both);
  67.         _sListener->Close();
  68.     }
  69. }
  70.  
  71. /// Accept incoming socket connection
  72. void SocketServer::AcceptCallback(IAsyncResult ^ar)
  73. {
  74.     Socket ^listener = nullptr;
  75.     Socket ^handler = nullptr;
  76.  
  77.     allDone->Set();
  78.  
  79.     listener = safe_cast<Socket^>(ar->AsyncState);
  80.     handler = listener->EndAccept(ar);
  81.  
  82.     StateObject ^state = gcnew StateObject();
  83.     state->workSocket = handler;
  84.     handler->BeginReceive(
  85.         state->buffer,
  86.         0,
  87.         StateObject::BufferSize,
  88.         SocketFlags::None,
  89.         gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
  90.         state);
  91. }
  92.  
  93. /// Recieve incoming socket data
  94. void SocketServer::ReceiveCallback(IAsyncResult ^ar)
  95. {
  96.     bool startNewProcessingThread = false;
  97.  
  98.     StateObject ^state = safe_cast<StateObject^>(ar->AsyncState);
  99.     Socket ^handler = state->workSocket;
  100.  
  101.     String ^content = String::Empty;
  102.     String ^response = String::Empty;
  103.  
  104.     int bytesRead = handler->EndReceive(ar);
  105.     String ^newLine = Encoding::ASCII->GetString(state->buffer,0,bytesRead);
  106.     if (bytesRead > 0)
  107.     {
  108.         state->sb->Append(newLine);
  109.         handler->BeginReceive(
  110.             state->buffer,
  111.             0,
  112.             StateObject::BufferSize,
  113.             SocketFlags::None,
  114.             gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
  115.             state);
  116.     }
  117.     if (newLine->Contains("<end_of_message>"))
  118.     {
  119.         if (state->sb->Length > 0)
  120.         {
  121.             Command ^command;
  122.             content = state->sb->ToString();
  123.             
  124.             Console::WriteLine("Read "+
  125.                 Convert::ToString(content->Length) +
  126.                 "bytes from client.\n Data: " +
  127.                 content + "\n");
  128.             
  129.             /*
  130.                 1) Parse command
  131.                 2) If valid command check for tokens
  132.                 3) If token not finished => reply with current token
  133.                 4) If no current tokens => reply with new token
  134.             */
  135.  
  136.             CommandsList cmd = CommandsList::NOT_INITIALIZED;
  137.             try
  138.             {
  139.                 /// Get JSON string from received message
  140.                 String ^json = content->Substring(0, content->LastIndexOf("}") + 1);
  141.                 /// Deserialize to command
  142.                 command = JsonConvert::DeserializeObject<Command^>(json);
  143.                 /// Convert command name to commands enum
  144.                 cmd = command->StringToCommand(command->command);
  145.             }
  146.             catch (...) { }
  147.  
  148.             /// Check if received command is valid
  149.             if (Command::IsValid(cmd))
  150.             {
  151.                 TokenStatus ^t;
  152.  
  153.                 /// Check if we have file-cached tokens
  154.                 if(!_tokenCache->CheckTokenFile(t))
  155.                 {
  156.                     /// Get token from token cache if no file-cached tokens
  157.                     t = _tokenCache->GetToken(command);
  158.                 }
  159.  
  160.                 // If we have cached token
  161.                 if (nullptr != t)
  162.                 {
  163.                     /// If command is confirmation, remove token from cache
  164.                     if (CommandsList::CONFIRM == cmd)
  165.                     {
  166.                         bool tokenDone = _tokenCache->RemoveToken(command);
  167.                         if (tokenDone)
  168.                         {
  169.                             _tokenCache->DeleteTokenFile();
  170.                         }                            
  171.  
  172.                         /// Reply with status of confirmation
  173.                         response = (gcnew JObject(
  174.                             gcnew JProperty("status",
  175.                                 TokenConverter::TokenStatusToString(
  176.                                     (tokenDone ? TOKEN_SUCCESS : TOKEN_ABORTED)))))->ToString();
  177.                     }
  178.                     /// Serialize response from token cache
  179.                     else
  180.                     {
  181.                         response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
  182.                     }
  183.                 }
  184.                 /// Create new token if we don't have any
  185.                 else
  186.                 {
  187.                     t = gcnew TokenStatus(command->token);
  188.                     _tokenCache->SetToken(command, t);
  189.                     startNewProcessingThread = true;
  190.                     response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
  191.                 }
  192.             }
  193.             else
  194.             {
  195.                 /// Got unsupported command, reply
  196.                 JObject ^o = gcnew JObject(
  197.                     gcnew JProperty("status", TokenConverter::TokenStatusToString(TOKEN_BAD_COMMAND)));
  198.  
  199.                 response = o->ToString();
  200.             }
  201.  
  202.             array<Byte> ^byteData = Encoding::Unicode->GetBytes(response);
  203.  
  204.             Console::WriteLine("Sending: \n" +
  205.                 response + "\n");
  206.             
  207.             handler->BeginSend(
  208.                 byteData,
  209.                 0,
  210.                 byteData->Length,
  211.                 SocketFlags::None,
  212.                 gcnew AsyncCallback(this, &SocketServer::SendCallback),
  213.                 handler);
  214.             
  215.             state->sb->Clear();
  216.  
  217.             /// If we didn't have active tokens and got a new command
  218.             /// Process it in a separate thread
  219.             if (startNewProcessingThread)
  220.             {
  221.                 SingleCommandProcessor^ threadWork =
  222.                     gcnew SingleCommandProcessor(command, _tokenCache, _commandsProcessor);
  223.                 Thread^ newThread = gcnew Thread( gcnew ThreadStart( threadWork, &SingleCommandProcessor::DoProcess ) );
  224.                 newThread->Start();
  225.             }
  226.         }
  227.     }
  228. }
  229.  
  230. /// Send data to client
  231. void SocketServer::SendCallback(IAsyncResult ^ar)
  232. {
  233.     Socket ^handler = safe_cast<Socket^>(ar->AsyncState);
  234.     int bytesSend = handler->EndSend(ar);
  235.  
  236.     Console::WriteLine("Sent "+
  237.         Convert::ToString(bytesSend) +
  238.         "bytes to client." + "\n");
  239.     
  240.     handler->Disconnect(true);
  241. }

V. To test server functionality there is a simplest client written in C# :

Client.cs
  1. /// <summary>
  2. /// Serializable to JSON command with token
  3. /// </summary>
  4. public class Command
  5. {
  6.     [JsonProperty]
  7.     public String command;
  8.  
  9.     [JsonProperty]
  10.     public String token;
  11. };
  12.  
  13. class Client
  14. {
  15.     static void Main(string[] args)
  16.     {
  17.         PrintHelp();
  18.  
  19.         byte[] bytes = new byte[1024];
  20.         string line = String.Empty;
  21.  
  22.         Command cmd = new Command();
  23.         
  24.         /// Read user command to send to server
  25.         while (true)
  26.         {
  27.             line = Console.ReadLine();
  28.             if (line == "exit")
  29.             {
  30.                 break;
  31.             }
  32.  
  33.             int option = Int32.Parse(line);
  34.             switch (option)
  35.             {
  36.                 case 0:
  37.                     cmd.command = "check_status";
  38.                     break;
  39.                 case 1:
  40.                     cmd.command = "init";
  41.                     cmd.token = GenerateToken();
  42.                     break;
  43.                 case 2:
  44.                     cmd.command = "say_hi";
  45.                     cmd.token = GenerateToken();
  46.                     break;
  47.                 case 3:
  48.                     cmd.command = "say_bye";
  49.                     cmd.token = GenerateToken();
  50.                     break;
  51.                 case 4:
  52.                     cmd.command = "confirm";
  53.                     break;
  54.                 default:
  55.                     PrintHelp();
  56.                     cmd.command = "echo";
  57.                     break;
  58.             }
  59.  
  60.             try
  61.             {
  62.                 SocketPermission permission = new SocketPermission(PermissionState.Unrestricted);
  63.                 permission.Demand();
  64.                 IPAddress ipAddr = IPAddress.Parse("127.0.0.1");
  65.                 IPEndPoint ipEndPoint = new IPEndPoint(ipAddr, 8081);
  66.                 Socket sender = new Socket(
  67.                     ipAddr.AddressFamily,
  68.                     SocketType.Stream,
  69.                     ProtocolType.Tcp
  70.                     );
  71.  
  72.                 sender.Connect(ipEndPoint);
  73.                 Console.WriteLine("Socket connected to {0}",
  74.                     sender.RemoteEndPoint.ToString());
  75.  
  76.                 /// Serialize command
  77.                 string theMessage = JsonConvert.SerializeObject(cmd, Formatting.Indented);
  78.                 Console.WriteLine(theMessage + "\n");
  79.  
  80.                 /// Add end of message and send
  81.                 byte[] msg = Encoding.ASCII.GetBytes(theMessage + "<end_of_message>");
  82.                 int bytesSend = sender.Send(msg);
  83.  
  84.                 /// Get response
  85.                 int bytesRec = sender.Receive(bytes);
  86.                 theMessage = Encoding.Unicode.GetString(bytes, 0, bytesRec);
  87.  
  88.                 while (sender.Available > 0)
  89.                 {
  90.                     bytesRec = sender.Receive(bytes);
  91.                     theMessage += Encoding.Unicode.GetString(bytes, 0, bytesRec);
  92.                 }
  93.  
  94.                 Console.WriteLine("The server reply: {0}\n", theMessage);
  95.  
  96.                 sender.Shutdown(SocketShutdown.Both);
  97.                 sender.Close();
  98.             }
  99.             catch (Exception ex)
  100.             {
  101.                 Console.WriteLine("Exception: {0}", ex.Message);
  102.             }
  103.         }
  104.     }
  105.  
  106.     /// <summary>
  107.     /// Generates dummy token
  108.     /// </summary>
  109.     /// <returns>Token string</returns>
  110.     private static string GenerateToken()
  111.     {
  112.         ASCIIEncoding encoding = new ASCIIEncoding();
  113.         byte[] token = encoding.GetBytes(
  114.             DateTime.Now.Hour.ToString() +
  115.             DateTime.Now.Minute.ToString() +
  116.             DateTime.Now.Second.ToString() +
  117.             DateTime.Now.Millisecond.ToString());
  118.         return Convert.ToBase64String(token, 0, token.Length);
  119.     }
  120.  
  121.     /// <summary>
  122.     /// Prints console help
  123.     /// </summary>
  124.     private static void PrintHelp()
  125.     {
  126.         Console.WriteLine("1 : init");
  127.         Console.WriteLine("2 : say_hi");
  128.         Console.WriteLine("3 : say_bye");
  129.         Console.WriteLine("4 : confirm");
  130.         Console.WriteLine("0 : check_status");
  131.         Console.WriteLine();
  132.     }
  133. }

The advantages of the following approach:

  1. We can guarantee that both client and server always get responses from each other
  2. Server is designed with potential support to process several commands simultaneously: each command has it’s own thread and main thread is processing incoming connections
  3. Using token-based approach we can guarantee that server is answering to required command – we cannot mess up responses
  4. Again thanks to token-based approach we can implement server polling from client side in order to track last command status
  5. None of responses if lost

As usual the Visual Studio solution and code files could be found on my public SVN: http://subversion.assembla.com/svn/max-s-blog-posts/ (SocketServerWithTokens folder).

Hope somebody will find this useful. Thanks and happy socket coding. Max.

Comments

3/20/2012 5:47:49 PM #

trackback

Token Based Asynchronous Socket Server On .NET 4

You've been kicked (a good thing) - Trackback from DotNetKicks.com

DotNetKicks.com | Reply

7/5/2012 7:29:59 AM #

pingback

Pingback from friendslosangeles.march9online.net

Travel Insurance for Kenya: what about senior travel? | Friends Los Angeles

friendslosangeles.march9online.net | Reply

Add comment


(Will show your Gravatar icon)

  Country flag

biuquote
  • Comment
  • Preview
Loading



About me

My name is Maxim Bovykin and currently I'm working as an R&D Software Engineer and Systems Analyst in a small innovative company. Here is my technical blog, which I started on Monday, February 20, 2012.

Month List

Calendar

<<  May 2013  >>
MoTuWeThFrSaSu
293012345
6789101112
13141516171819
20212223242526
272829303112
3456789

View posts in large calendar