Even though there are exist many technologies and approaches for network communication, such as Windows Communication Foundation or Web Servers with HTTP POST/GET requests, some tasks still require using of Network TCP Sockets. I’ve got a chance to work with sockets a lot recently and one of my tasks was intermediate socket service which provides communication between automatic robot and client software. First and naïve solution was the following:
-
Client send command
-
Socket server get command and start executing it
-
Once command execution is done, send response to client
There is nothing wrong with this solution except of just one thing: the time between steps 2 and 3 could be really big, up to several minutes. Socket server was written on .Net 4, client was written on Ruby 1.9. For unknown reasons (after some internet research it happened that this is Ruby bug) when waiting time exceeds ~1 minute, client never gets server response, even though server clearly has sent it.
Here I’ll show one of possible solutions which eliminates this problem and adds some flexibility to solution in general. We will use token based server, which means that server and client will use token exchange as a handshake to confirm that all of the messages were correctly delivered.
The basis for Server Socket could be found on the following MSDN articles:
-
-
We will use that and will build up more functionality.
Our scenario will be the following:
-
Client initiates connection by sending the command with token for that command
-
Server gets the command, stores token for that command in local token cache and sends response immediately
-
On a separate thread server executes the command
-
Client sends requests with “check status” command and token
-
If command has not finished yet, server replies that this token is currently “busy”
-
Once command is finished, server replies “finished” for “check status” request
-
Client should confirm that command is finished, server cleans up token cache after confirmation

The code that I’m providing here is just an example, prototype, and has some restrictions:
-
Only one command could be set up in token cache
-
If client will send another command without prior confirmation for previous command, it will be ignored by server and previous token status will be sent as a response
-
It will support dummy commands, such as “say hi” or “say bye”, but it is sufficient for the demo purposes
We will use JSON as an exchange protocol and an awesome JSON library for .Net: http://json.codeplex.com/
Server code is written in C++/CLI and using CLR classes. Easily portable to C# (just a matter of syntax in this case)
I. Define the command:
Commands.h
- /// <summary>
- /// Enumeration with supported commands
- /// </summary>
- public enum class CommandsList
- {
- NOT_INITIALIZED = 0,
- INIT,
- SAY_HI,
- SAY_BYE,
- CHECK_STATUS,
- CONFIRM,
-
- UNSUPPORTED = -1
- };
-
- /// <summary>
- /// Serializable to JSON command with token
- /// </summary>
- public ref class Command
- {
- public:
-
- Command();
-
- #pragma region Utils
-
- /// <summary>
- /// Converts string to commands enum
- /// </summary>
- static CommandsList StringToCommand(String ^command);
- /// <summary>
- /// Checks if command is valid to be processed
- /// </summary>
- static bool IsValid(CommandsList cmd);
-
- #pragma endregion
-
- #pragma region Public Properties
-
- /// <summary>
- /// Command name
- /// </summary>
- [JsonProperty]
- property String ^command;
- /// <summary>
- /// Token assosiated with the command
- /// </summary>
- [JsonProperty]
- property String ^token;
-
- #pragma endregion
-
- private:
-
- #pragma region Private Members
-
- /// <summary>
- /// Stores assosiations string - commands enum
- /// </summary>
- static Hashtable ^commandsMap;
-
- #pragma endregion
- };
Commands.cpp
- /// Fill up the commands hashtable with name - enum assosiations
- Command::Command()
- {
- #pragma region Command name to enum mapping
-
- commandsMap = gcnew Hashtable();
- commandsMap["init"] = CommandsList::INIT;
- commandsMap["say_hi"] = CommandsList::SAY_HI;
- commandsMap["say_bye"] = CommandsList::SAY_BYE;
- commandsMap["check_status"] = CommandsList::CHECK_STATUS;
- commandsMap["confirm"] = CommandsList::CONFIRM;
-
- #pragma endregion
- }
-
- /// If command name was not found in the hashtable, return unsupported
- CommandsList Command::StringToCommand(String ^command)
- {
- if (commandsMap->ContainsKey(command))
- {
- return safe_cast<CommandsList>(commandsMap[command]);
- }
- else
- {
- return CommandsList::UNSUPPORTED;
- }
- }
-
- /// Command not valid if it is unsupported on not initialized
- bool Command::IsValid(CommandsList cmd)
- {
- if ((CommandsList::NOT_INITIALIZED != cmd) &&
- (CommandsList::UNSUPPORTED) != cmd)
- {
- return true;
- }
- return false;
- }
II. Define the commands processor:
CommandsProcessor.h
- /// <summary>
- /// Class responsible for every command process
- /// </summary>
- public ref class CommandsProcessor
- {
- public:
-
- CommandsProcessor();
-
- /// <summary>
- /// Chooses which command to process
- /// </summary>
- /// <param name="command">Command object</param>
- /// <param name="tokenCache">Token cache object</param>
- void ProcessCommand(Command ^command, TokenCache^ tokenCache);
-
- private:
-
- #pragma region Every Command Processor
-
- /// Individual processors for every command
-
- bool _isInitialized;
- void ProcessInit(Command ^command, TokenCache^ tokenCache);
-
- void ProcessHi(Command ^command, TokenCache^ tokenCache);
- void ProcessBye(Command ^command, TokenCache^ tokenCache);
-
- #pragma endregion
-
- #pragma region Exceptions Handlers
-
- /// Handlers for exceptions to store and to include in service reply
-
- void HandleManagedException(Exception^ exception);
- void EmptyExceptionMessage();
-
- ServiceException^ _exception;
-
- #pragma endregion
- };
CommandsProcessor.cpp
- /// Constructor
- CommandsProcessor::CommandsProcessor()
- {
- _isInitialized = false;
- _exception = gcnew ServiceException();
- }
-
- /// Process command, emulate processing for this demo
- void CommandsProcessor::ProcessCommand(Command ^command, TokenCache ^tokenCache)
- {
- //TODO: EMULATION ONLY
- Console::WriteLine("--SLEEP--\n");
- Thread::Sleep(10000);
- Console::WriteLine("--WAKE UP--\n");
- /////////////////
-
- /// Parse command and process it
- CommandsList cmd = command->StringToCommand(command->command);
- switch (cmd)
- {
- case CommandsList::INIT:
- {
- ProcessInit(command, tokenCache);
- break;
- }
- case CommandsList::SAY_HI:
- {
- ProcessHi(command, tokenCache);
- break;
- }
- case CommandsList::SAY_BYE:
- {
- ProcessBye(command, tokenCache);
- break;
- }
- case CommandsList::CHECK_STATUS:
- case CommandsList::CONFIRM:
- default:
- {
- break;
- }
- }
-
- /// We already caught all exceptions in every command processor
- EmptyExceptionMessage();
- }
-
- #pragma region Every Command Processor
-
- #pragma region Init
- void CommandsProcessor::ProcessInit(Command ^command, TokenCache ^tokenCache)
- {
- try
- {
- /// TODO: Insert specific processing here
- _isInitialized = true;
- }
- catch (Exception ^exception)
- {
- /// Store exception in order to include it in report
- HandleManagedException(exception);
- }
-
- /// Form JSON formatted reply
- JObject ^o = gcnew JObject(
- gcnew JProperty("command", command->command),
- gcnew JProperty("exception_message", _exception->_exceptionMessage),
- gcnew JProperty("is_init", _isInitialized));
-
- /// Set status and report in the token cache
- tokenCache->SetTokenResponse(command, o);
- tokenCache->SetTokenStatus(command,
- (_exception->_isException) ?
- TOKEN_ABORTED : TOKEN_SUCCESS,
- true);
- tokenCache->WriteTokenFile(tokenCache->GetToken(command));
-
- }
- #pragma endregion
-
- #pragma region Hi
- void CommandsProcessor::ProcessHi(Command ^command, TokenCache ^tokenCache)
- {
- try
- {
- /// TODO: Insert specific processing here
- }
- catch (Exception ^exception)
- {
- /// Store exception in order to include it in report
- HandleManagedException(exception);
- }
-
- /// Form JSON formatted reply
- JObject ^o = gcnew JObject(
- gcnew JProperty("command", command->command),
- gcnew JProperty("exception_message", _exception->_exceptionMessage),
- gcnew JProperty("message", "Hi!"));
-
- /// Set status and report in the token cache
- tokenCache->SetTokenResponse(command, o);
- tokenCache->SetTokenStatus(command,
- (_exception->_isException) ?
- TOKEN_ABORTED : TOKEN_SUCCESS,
- true);
- tokenCache->WriteTokenFile(tokenCache->GetToken(command));
-
- }
- #pragma endregion
-
- #pragma region Bye
- void CommandsProcessor::ProcessBye(Command ^command, TokenCache ^tokenCache)
- {
- try
- {
- /// TODO: Insert specific processing here
- }
- catch (Exception ^exception)
- {
- /// Store exception in order to include it in report
- HandleManagedException(exception);
- }
-
- /// Form JSON formatted reply
- JObject ^o = gcnew JObject(
- gcnew JProperty("command", command->command),
- gcnew JProperty("exception_message", _exception->_exceptionMessage),
- gcnew JProperty("message", "Bye!"));
-
- /// Set status and report in the token cache
- tokenCache->SetTokenResponse(command, o);
- tokenCache->SetTokenStatus(command,
- (_exception->_isException) ?
- TOKEN_ABORTED : TOKEN_SUCCESS,
- true);
- tokenCache->WriteTokenFile(tokenCache->GetToken(command));
-
- }
- #pragma endregion
-
- #pragma endregion
-
- #pragma region Exceptions Handlers
-
- /// Store exception in order to include it in report
- void CommandsProcessor::HandleManagedException(Exception^ exception)
- {
- _exception->_isException = true;
- _exception->_exceptionMessage = gcnew String(exception->Message);
- }
-
- /// Clean up stored exception
- void CommandsProcessor::EmptyExceptionMessage()
- {
- _exception->_isException = false;
- _exception->_exceptionMessage = String::Empty;
- }
-
- #pragma endregion
Here we use helper class to store exceptions:
Exceptions.h
- /// <summary>
- /// Exceptions helper: stores exception to include in reply
- /// </summary>
- public ref class ServiceException
- {
- public:
- ServiceException() :
- _isException(false),
- _exceptionMessage(nullptr)
- { }
-
- bool _isException;
- String^ _exceptionMessage;
- };
III. Define the local Token Cache for storing current command’s status and info.
This class stores token status in memory and on file system for emergency cases (power off computer without prior confirmation for token for example).
TokenCache.h
- /// <summary>
- /// Enumeration with token statuses
- /// </summary>
- public enum TokenStatuses
- {
- TOKEN_BUSY = 0,
- TOKEN_SUCCESS,
- TOKEN_ABORTED,
- TOKEN_BAD_COMMAND = -1
- };
-
- /// <summary>
- /// Helper class: converts token status from enum to string
- /// </summary>
- public ref class TokenConverter
- {
- public:
- static String^ TokenStatusToString(TokenStatuses status);
- };
-
- /// <summary>
- /// Serializable to JSON token status
- /// </summary>
- public ref class TokenStatus
- {
- public:
-
- #pragma region Public Properties
-
- /// <summary>
- /// Token assosiated with the command
- /// </summary>
- [JsonProperty]
- property String ^token;
- /// <summary>
- /// Token status
- /// </summary>
- [JsonProperty]
- property String ^status;
- /// <summary>
- /// Is command processed?
- /// </summary>
- [JsonProperty]
- property bool finished;
- /// <summary>
- /// JSON response for current command
- /// </summary>
- [JsonProperty]
- property JObject ^response;
-
- #pragma endregion
-
- TokenStatus(String ^_token);
- };
-
- /// <summary>
- /// Token Cache class responsible for Set/Get/Remove current token
- /// Limitations of this version: only one token could be set up
- /// But easily extendable to unlimited nuber of tokens
- /// </summary>
- public ref class TokenCache
- {
- public:
-
- #pragma region Token Operations
-
- /// Token operations: Set/Get/Remove
-
- static TokenStatus ^GetToken(Command^ cmd);
- static bool SetToken(Command^ cmd, TokenStatus^ token);
- static bool SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished);
- static bool SetTokenResponse(Command^ cmd, JObject ^response);
- static bool RemoveToken(Command^ cmd);
-
- /// Store last operation in a token file for emergency cases
-
- static bool CheckTokenFile([Out] TokenStatus ^%token);
- static void WriteTokenFile(TokenStatus ^token);
- static void DeleteTokenFile();
-
- #pragma endregion
-
- private:
-
- #pragma region Private Members
-
- /// <summary>
- /// Stores assosiations token - status
- /// </summary>
- static Dictionary<String^, TokenStatus^> ^_tokens
- = gcnew Dictionary <String^, TokenStatus^>();
-
- /// If new token came, check for previous token
- static String^ prevToken = String::Empty;
-
- #pragma endregion
- };
TokenCache.cpp
- /// Token status constructor, token is busy by default
- TokenStatus::TokenStatus(String^ _token)
- {
- token = _token;
- status = TokenConverter::TokenStatusToString(TOKEN_BUSY);
- finished = false;
- response = nullptr;
- }
-
- /// Gets token
- /// Locks tokens dictionary
- /// If previous token is not empty, use it and ignore new token
- TokenStatus^ TokenCache::GetToken(Command^ cmd)
- {
- TokenStatus ^t = nullptr;
-
- System::Threading::Monitor::Enter(_tokens);
- try
- {
- if (String::Empty == prevToken)
- {
- _tokens->TryGetValue(cmd->token, t);
- }
- else
- {
- _tokens->TryGetValue(prevToken, t);
- }
- }
- finally
- {
- System::Threading::Monitor::Exit(_tokens);
- }
-
- return t;
- }
-
- /// Removes token
- /// Locks tokens dictionary
- /// Removes current token from cache if it finished
- bool TokenCache::RemoveToken(Command^ cmd)
- {
- bool res = false;
- TokenStatus ^t = nullptr;
-
- System::Threading::Monitor::Enter(_tokens);
- try
- {
- if (String::Empty == prevToken)
- {
- if (_tokens->TryGetValue(cmd->token, t))
- {
- if (t->finished)
- {
- _tokens->Remove(cmd->token);
- res = true;
- }
- }
- }
- else
- {
- if (_tokens->TryGetValue(prevToken, t))
- {
- if (t->finished)
- {
- _tokens->Remove(prevToken);
- prevToken = String::Empty;
- res = true;
- }
- }
- }
- }
- finally
- {
- System::Threading::Monitor::Exit(_tokens);
- }
-
- return res;
- }
-
- /// Sets token
- /// Locks tokens dictionary
- /// If no current tokens in progress, sets new token
- bool TokenCache::SetToken(Command^ cmd, TokenStatus^ token)
- {
- bool res = false;
-
- System::Threading::Monitor::Enter(_tokens);
- try
- {
- if (String::Empty == prevToken)
- {
- if (!_tokens->ContainsKey(cmd->token))
- {
- _tokens[cmd->token] = token;
- prevToken = cmd->token;
- res = true;
- }
- }
- }
- finally
- {
- System::Threading::Monitor::Exit(_tokens);
- }
-
- return res;
- }
-
- /// Sets status for current token
- /// Locks tokens dictionary
- bool TokenCache::SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished)
- {
- bool res = false;
-
- System::Threading::Monitor::Enter(_tokens);
- try
- {
- TokenStatus ^t = nullptr;
- if (_tokens->TryGetValue(cmd->token, t))
- {
- t->status = TokenConverter::TokenStatusToString(status);
- t->finished = finished;
- _tokens[cmd->token] = t;
- res = true;
- }
- }
- finally
- {
- System::Threading::Monitor::Exit(_tokens);
- }
-
- return res;
- }
-
- /// Sets response for current token
- /// Locks tokens dictionary
- bool TokenCache::SetTokenResponse(Command^ cmd, JObject ^response)
- {
- bool res = false;
-
- System::Threading::Monitor::Enter(_tokens);
- try
- {
- TokenStatus ^t = nullptr;
- if (_tokens->TryGetValue(cmd->token, t))
- {
- t->response = response;
- res = true;
- }
- }
- finally
- {
- System::Threading::Monitor::Exit(_tokens);
- }
-
- return res;
- }
-
- /// Recovers token from isolated file cache
- bool TokenCache::CheckTokenFile([Out] TokenStatus ^%token)
- {
- bool fileExists = false;
- token = gcnew TokenStatus(String::Empty);
-
- try
- {
- IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
- IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream("token.last", FileMode::Open, isoFile);
-
- String ^tokenData = nullptr;
- try
- {
- StreamReader ^sr = gcnew StreamReader(isoStream);
- try
- {
- tokenData = sr->ReadToEnd();
- }
- finally
- {
- delete sr;
- }
- fileExists = true;
- }
- catch (...) { }
- if (!String::IsNullOrEmpty(tokenData))
- {
- try
- {
- token = JsonConvert::DeserializeObject<TokenStatus^>(tokenData);
- }
- catch (...) { }
- }
- isoFile->Close();
- }
- catch (...) { }
-
- return fileExists;
- }
-
- /// Backups token to isolated file cache
- void TokenCache::WriteTokenFile(TokenStatus ^token)
- {
- try
- {
- IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
- IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream("token.last", FileMode::Create, FileAccess::Write, isoFile);
-
- try
- {
- JsonSerializer ^serializer = gcnew JsonSerializer();
- StreamWriter ^sw = gcnew StreamWriter(isoStream);
- try
- {
- JsonWriter ^writer = gcnew JsonTextWriter(sw);
- try
- {
- serializer->Serialize(writer, token);
- }
- finally
- {
- delete writer;
- }
- }
- finally
- {
- delete sw;
- }
- }
- catch (...) { }
- delete isoFile;
- isoFile->Close();
- }
- catch (...) { }
- }
-
- /// Cleanups isolated tokens file
- void TokenCache::DeleteTokenFile()
- {
- try
- {
- IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
- isoFile->DeleteFile("token.last");
- delete isoFile;
- isoFile->Close();
- }
- catch(...) { }
-
- }
-
- /// Converts token status to string
- String^ TokenConverter::TokenStatusToString(TokenStatuses status)
- {
- String^ retStr = String::Empty;
-
- switch(status)
- {
- case TOKEN_BUSY:
- retStr = "busy";
- break;
- case TOKEN_SUCCESS:
- retStr = "success";
- break;
- case TOKEN_ABORTED:
- retStr = "aborted";
- break;
- case TOKEN_BAD_COMMAND:
- default:
- retStr = "bad_command";
- break;
- }
-
- return retStr;
- }
IV. Finally, define threaded commands processor and Socket Server itself:
SoketServer.h
- /// <summary>
- /// Threaded object responsible for just one command processing
- /// </summary>
- public ref class SingleCommandProcessor
- {
- public:
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="currCmd">Command to process</param>
- /// <param name="tokenCache">Token cache object</param>
- /// <param name="commandsProcessor">Commands processor object</param>
- SingleCommandProcessor(
- Command^ currCmd,
- TokenCache ^tokenCache,
- CommandsProcessor ^commandsProcessor) :
- _currCmd(currCmd),
- _tokenCache(tokenCache),
- _commandsProcessor(commandsProcessor)
- {}
-
- /// <summary>
- /// Thread function to process current command
- /// </summary>
- void DoProcess();
-
- private:
- Command ^_currCmd;
- TokenCache ^_tokenCache;
- CommandsProcessor ^_commandsProcessor;
- };
-
- /// <summary>
- /// State object for reading client data asynchronously
- /// </summary>
- public ref class StateObject
- {
- public:
- StateObject();
-
- Socket ^workSocket;
- literal int BufferSize = 1024;
- array<Byte> ^buffer;
- StringBuilder ^sb;
-
- private:
- bool _initialized;
- void InitializeInstanceFields();
- };
-
- /// <summary>
- /// Socket Server exactly as described on MSDN articles:
- ///
- /// Using an Asynchronous Server Socket:
- /// http://msdn.microsoft.com/en-us/library/5w7b7x5f.aspx
- ///
- /// Asynchronous Server Socket Example
- /// http://msdn.microsoft.com/en-us/library/fx6588te.aspx
- /// </summary>
- public ref class SocketServer
- {
- public:
- SocketServer(String ^ipAddress, int port);
-
- void Run(String ^ipAddress, int port, int backlog);
- void Stop();
-
- private:
- void AcceptCallback(IAsyncResult ^ar);
- void ReceiveCallback(IAsyncResult ^ar);
- void SendCallback(IAsyncResult ^ar);
-
- SocketPermission ^_permission;
- Socket ^_sListener;
- static ManualResetEvent ^allDone = gcnew ManualResetEvent(false);
-
- CommandsProcessor ^_commandsProcessor;
- TokenCache ^_tokenCache;
- };
The code for Asynchronous Callbacks is the same here as in MSDN examples (links are above) except of Receive Callback part. The logic for receive callback is the following:
-
Parse command
-
If valid command check for tokens
-
If token not finished => reply with current token
-
If no current tokens => reply with new token
SocketServer.cpp
- /// State object
- StateObject::StateObject()
- {
- InitializeInstanceFields();
- }
-
- /// Initialize state object
- void StateObject::InitializeInstanceFields()
- {
- if ( !_initialized)
- {
- buffer = gcnew array<Byte>(BufferSize);
- sb = gcnew StringBuilder();
-
- _initialized = true;
- }
- }
-
- /// Thread function for single command processing
- void SingleCommandProcessor::DoProcess()
- {
- _commandsProcessor->ProcessCommand(_currCmd, _tokenCache);
- }
-
- /// Socket Server constructor
- SocketServer::SocketServer(String ^ipAddress, int port)
- {
- _permission = gcnew SocketPermission(PermissionState::Unrestricted);
- _commandsProcessor = gcnew CommandsProcessor();
- _tokenCache = gcnew TokenCache();
-
- _sListener = nullptr;
- }
-
- /// Run Socket Server
- void SocketServer::Run(String ^ipAddress, int port, int backlog)
- {
- _permission->Demand();
-
- IPAddress ^ipAddr = IPAddress::Parse(ipAddress);
- IPEndPoint ^ipEndPoint = gcnew IPEndPoint(ipAddr, port);
-
- _sListener = gcnew Socket(ipAddr->AddressFamily, SocketType::Stream, ProtocolType::Tcp);
- _sListener->Bind(ipEndPoint);
- _sListener->Listen(backlog);
-
- while (true) {
- allDone->Reset();
-
- Console::WriteLine("Waiting for a connection on port "+
- Convert::ToString(ipEndPoint->Address) + ":" +
- Convert::ToString(ipEndPoint->Port));
-
- AsyncCallback ^aCallback = gcnew AsyncCallback(this, &SocketServer::AcceptCallback);
- _sListener->BeginAccept(aCallback, _sListener);
-
- allDone->WaitOne();
- }
- }
-
- /// Stop Socket Server
- void SocketServer::Stop()
- {
- if (_sListener->Connected)
- {
- _sListener->Shutdown(SocketShutdown::Both);
- _sListener->Close();
- }
- }
-
- /// Accept incoming socket connection
- void SocketServer::AcceptCallback(IAsyncResult ^ar)
- {
- Socket ^listener = nullptr;
- Socket ^handler = nullptr;
-
- allDone->Set();
-
- listener = safe_cast<Socket^>(ar->AsyncState);
- handler = listener->EndAccept(ar);
-
- StateObject ^state = gcnew StateObject();
- state->workSocket = handler;
- handler->BeginReceive(
- state->buffer,
- 0,
- StateObject::BufferSize,
- SocketFlags::None,
- gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
- state);
- }
-
- /// Recieve incoming socket data
- void SocketServer::ReceiveCallback(IAsyncResult ^ar)
- {
- bool startNewProcessingThread = false;
-
- StateObject ^state = safe_cast<StateObject^>(ar->AsyncState);
- Socket ^handler = state->workSocket;
-
- String ^content = String::Empty;
- String ^response = String::Empty;
-
- int bytesRead = handler->EndReceive(ar);
- String ^newLine = Encoding::ASCII->GetString(state->buffer,0,bytesRead);
- if (bytesRead > 0)
- {
- state->sb->Append(newLine);
- handler->BeginReceive(
- state->buffer,
- 0,
- StateObject::BufferSize,
- SocketFlags::None,
- gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
- state);
- }
- if (newLine->Contains("<end_of_message>"))
- {
- if (state->sb->Length > 0)
- {
- Command ^command;
- content = state->sb->ToString();
-
- Console::WriteLine("Read "+
- Convert::ToString(content->Length) +
- "bytes from client.\n Data: " +
- content + "\n");
-
- /*
- 1) Parse command
- 2) If valid command check for tokens
- 3) If token not finished => reply with current token
- 4) If no current tokens => reply with new token
- */
-
- CommandsList cmd = CommandsList::NOT_INITIALIZED;
- try
- {
- /// Get JSON string from received message
- String ^json = content->Substring(0, content->LastIndexOf("}") + 1);
- /// Deserialize to command
- command = JsonConvert::DeserializeObject<Command^>(json);
- /// Convert command name to commands enum
- cmd = command->StringToCommand(command->command);
- }
- catch (...) { }
-
- /// Check if received command is valid
- if (Command::IsValid(cmd))
- {
- TokenStatus ^t;
-
- /// Check if we have file-cached tokens
- if(!_tokenCache->CheckTokenFile(t))
- {
- /// Get token from token cache if no file-cached tokens
- t = _tokenCache->GetToken(command);
- }
-
- // If we have cached token
- if (nullptr != t)
- {
- /// If command is confirmation, remove token from cache
- if (CommandsList::CONFIRM == cmd)
- {
- bool tokenDone = _tokenCache->RemoveToken(command);
- if (tokenDone)
- {
- _tokenCache->DeleteTokenFile();
- }
-
- /// Reply with status of confirmation
- response = (gcnew JObject(
- gcnew JProperty("status",
- TokenConverter::TokenStatusToString(
- (tokenDone ? TOKEN_SUCCESS : TOKEN_ABORTED)))))->ToString();
- }
- /// Serialize response from token cache
- else
- {
- response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
- }
- }
- /// Create new token if we don't have any
- else
- {
- t = gcnew TokenStatus(command->token);
- _tokenCache->SetToken(command, t);
- startNewProcessingThread = true;
- response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
- }
- }
- else
- {
- /// Got unsupported command, reply
- JObject ^o = gcnew JObject(
- gcnew JProperty("status", TokenConverter::TokenStatusToString(TOKEN_BAD_COMMAND)));
-
- response = o->ToString();
- }
-
- array<Byte> ^byteData = Encoding::Unicode->GetBytes(response);
-
- Console::WriteLine("Sending: \n" +
- response + "\n");
-
- handler->BeginSend(
- byteData,
- 0,
- byteData->Length,
- SocketFlags::None,
- gcnew AsyncCallback(this, &SocketServer::SendCallback),
- handler);
-
- state->sb->Clear();
-
- /// If we didn't have active tokens and got a new command
- /// Process it in a separate thread
- if (startNewProcessingThread)
- {
- SingleCommandProcessor^ threadWork =
- gcnew SingleCommandProcessor(command, _tokenCache, _commandsProcessor);
- Thread^ newThread = gcnew Thread( gcnew ThreadStart( threadWork, &SingleCommandProcessor::DoProcess ) );
- newThread->Start();
- }
- }
- }
- }
-
- /// Send data to client
- void SocketServer::SendCallback(IAsyncResult ^ar)
- {
- Socket ^handler = safe_cast<Socket^>(ar->AsyncState);
- int bytesSend = handler->EndSend(ar);
-
- Console::WriteLine("Sent "+
- Convert::ToString(bytesSend) +
- "bytes to client." + "\n");
-
- handler->Disconnect(true);
- }
V. To test server functionality there is a simplest client written in C# :
Client.cs
- /// <summary>
- /// Serializable to JSON command with token
- /// </summary>
- public class Command
- {
- [JsonProperty]
- public String command;
-
- [JsonProperty]
- public String token;
- };
-
- class Client
- {
- static void Main(string[] args)
- {
- PrintHelp();
-
- byte[] bytes = new byte[1024];
- string line = String.Empty;
-
- Command cmd = new Command();
-
- /// Read user command to send to server
- while (true)
- {
- line = Console.ReadLine();
- if (line == "exit")
- {
- break;
- }
-
- int option = Int32.Parse(line);
- switch (option)
- {
- case 0:
- cmd.command = "check_status";
- break;
- case 1:
- cmd.command = "init";
- cmd.token = GenerateToken();
- break;
- case 2:
- cmd.command = "say_hi";
- cmd.token = GenerateToken();
- break;
- case 3:
- cmd.command = "say_bye";
- cmd.token = GenerateToken();
- break;
- case 4:
- cmd.command = "confirm";
- break;
- default:
- PrintHelp();
- cmd.command = "echo";
- break;
- }
-
- try
- {
- SocketPermission permission = new SocketPermission(PermissionState.Unrestricted);
- permission.Demand();
- IPAddress ipAddr = IPAddress.Parse("127.0.0.1");
- IPEndPoint ipEndPoint = new IPEndPoint(ipAddr, 8081);
- Socket sender = new Socket(
- ipAddr.AddressFamily,
- SocketType.Stream,
- ProtocolType.Tcp
- );
-
- sender.Connect(ipEndPoint);
- Console.WriteLine("Socket connected to {0}",
- sender.RemoteEndPoint.ToString());
-
- /// Serialize command
- string theMessage = JsonConvert.SerializeObject(cmd, Formatting.Indented);
- Console.WriteLine(theMessage + "\n");
-
- /// Add end of message and send
- byte[] msg = Encoding.ASCII.GetBytes(theMessage + "<end_of_message>");
- int bytesSend = sender.Send(msg);
-
- /// Get response
- int bytesRec = sender.Receive(bytes);
- theMessage = Encoding.Unicode.GetString(bytes, 0, bytesRec);
-
- while (sender.Available > 0)
- {
- bytesRec = sender.Receive(bytes);
- theMessage += Encoding.Unicode.GetString(bytes, 0, bytesRec);
- }
-
- Console.WriteLine("The server reply: {0}\n", theMessage);
-
- sender.Shutdown(SocketShutdown.Both);
- sender.Close();
- }
- catch (Exception ex)
- {
- Console.WriteLine("Exception: {0}", ex.Message);
- }
- }
- }
-
- /// <summary>
- /// Generates dummy token
- /// </summary>
- /// <returns>Token string</returns>
- private static string GenerateToken()
- {
- ASCIIEncoding encoding = new ASCIIEncoding();
- byte[] token = encoding.GetBytes(
- DateTime.Now.Hour.ToString() +
- DateTime.Now.Minute.ToString() +
- DateTime.Now.Second.ToString() +
- DateTime.Now.Millisecond.ToString());
- return Convert.ToBase64String(token, 0, token.Length);
- }
-
- /// <summary>
- /// Prints console help
- /// </summary>
- private static void PrintHelp()
- {
- Console.WriteLine("1 : init");
- Console.WriteLine("2 : say_hi");
- Console.WriteLine("3 : say_bye");
- Console.WriteLine("4 : confirm");
- Console.WriteLine("0 : check_status");
- Console.WriteLine();
- }
- }
The advantages of the following approach:
-
We can guarantee that both client and server always get responses from each other
-
Server is designed with potential support to process several commands simultaneously: each command has it’s own thread and main thread is processing incoming connections
-
Using token-based approach we can guarantee that server is answering to required command – we cannot mess up responses
-
Again thanks to token-based approach we can implement server polling from client side in order to track last command status
-
None of responses if lost
As usual the Visual Studio solution and code files could be found on my public SVN: http://subversion.assembla.com/svn/max-s-blog-posts/ (SocketServerWithTokens folder).
Hope somebody will find this useful. Thanks and happy socket coding. Max.