winipc-ua/Services/IPCService.cs
2025-08-03 13:58:46 +03:00

326 lines
12 KiB
C#

/* This software is licensed by the MIT License, see LICENSE file */
/* Copyright © 2024-2025 Gregory Lirent */
#if !WINIPC_UA_SERVER
using Google.Protobuf;
#else
using Microsoft.Extensions.Options;
using System.Diagnostics;
using WinIPC.Config;
#endif
using System.IO.Pipes;
using WinIPC.Models;
using WinIPC.Utils;
namespace WinIPC.Services
{
public class IPCService : IHostedService, IDisposable
{
protected readonly ILogger _logger;
protected string _name;
protected Task? _task;
protected CancellationTokenSource? _cTok;
protected PipeStream? _pipe;
protected string GetPipeName(int sessionId = 0)
{
if (sessionId == 0)
return _name;
return $"{_name}.{sessionId}";
}
public Task StartAsync(CancellationToken cTok)
{
_logger.LogInformation("IPCService is starting...");
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cTok)
{
_logger.LogInformation("IPCService is stopping...");
_cTok?.Cancel();
if (_task != null)
{
try
{
await _task;
}
catch (OperationCanceledException) { /* Expected */ }
catch (Exception ex) { _logger.LogError(ex, "Error while waiting for server listening task to complete."); }
}
Dispose();
_logger.LogInformation("IPCService stopped.");
}
public void Dispose()
{
_cTok?.Dispose();
_pipe?.Dispose();
GC.SuppressFinalize(this);
}
protected IPCService(ILogger logger, string name = "")
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_name = name;
}
protected async Task<byte[]> ReadMessageAsync(PipeStream pipe, CancellationToken cTok)
{
byte[] lengthBuffer = new byte[4];
int bytesRead;
bytesRead = await pipe.ReadAsync(lengthBuffer, 0, 4, cTok);
if (bytesRead == 0) throw new EndOfStreamException("Pipe closed or no data for length prefix.");
if (bytesRead < 4) throw new IOException($"Failed to read full 4-byte length prefix. Read {bytesRead} bytes.");
int messageLength = BitConverter.ToInt32(lengthBuffer, 0);
if (messageLength <= 0) throw new InvalidDataException($"Received invalid message length: {messageLength}.");
byte[] messageBuffer = new byte[messageLength];
int totalBytesRead = 0;
while (totalBytesRead < messageLength)
{
bytesRead = await pipe.ReadAsync(messageBuffer, totalBytesRead, messageLength - totalBytesRead, cTok);
if (bytesRead == 0) throw new EndOfStreamException($"Pipe closed unexpectedly while reading message payload. Expected {messageLength} bytes, read {totalBytesRead}.");
totalBytesRead += bytesRead;
}
return messageBuffer;
}
protected async Task<bool> WriteMessageAsync(PipeStream pipe, byte[] buffer, CancellationToken cTok)
{
try
{
byte[] lengthPrefix = BitConverter.GetBytes(buffer.Length);
await pipe.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cTok);
await pipe.WriteAsync(buffer, 0, buffer.Length, cTok);
await pipe.FlushAsync(cTok);
return true;
}
catch (OperationCanceledException)
{
_logger.LogInformation("WriteMessageAsync: Operation cancelled.");
}
catch (Exception ex)
{
_logger.LogError(ex, $"WriteMessageAsync: Error writing to pipe. Buffer size: {buffer.Length} bytes.");
}
return false;
}
}
#if WINIPC_UA_SERVER
public sealed class IPCServerService : IPCService
{
private Func<Request, Task<Response>>? _handler;
private string _pipeName = String.Empty;
public IPCServerService(ILogger<IPCServerService> logger, IOptions<AppConfig> appConfigOptions)
:base(logger, appConfigOptions?.Value?.IPCService.BasePipeName ?? throw new ArgumentNullException(nameof(appConfigOptions)))
{ }
public Task StartListening(Func<Request, Task<Response>> handler, CancellationToken cTok)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
_cTok = CancellationTokenSource.CreateLinkedTokenSource(cTok);
_task = Task.Run(() => Listen(_cTok.Token), _cTok.Token);
return _task;
}
private async Task Listen(CancellationToken cTok)
{
_pipeName = GetPipeName(Process.GetCurrentProcess().SessionId);
_logger.LogInformation($"Server: Starting listen loop on pipe '{_pipeName}'...");
while (!cTok.IsCancellationRequested)
{
try
{
var pipe = new NamedPipeServerStream(
_pipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
_pipe = pipe;
_logger.LogInformation("Server: Waiting for client connection...");
await pipe.WaitForConnectionAsync(cTok);
_logger.LogInformation("Server: Client connected.");
_ = HandleClientConnectionAsync(pipe, cTok);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Server: Listen loop cancelled.");
_pipe?.Dispose();
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Server: Error in main listen loop. Disposing current pipe and continuing to next iteration.");
_pipe?.Dispose();
await Task.Delay(1000, cTok);
}
}
_logger.LogInformation("Server: Listen loop finished.");
}
private async Task HandleClientConnectionAsync(NamedPipeServerStream pipe, CancellationToken cTok)
{
try
{
_logger.LogInformation($"Server: Handling connection from pipe '{_pipeName}'...");
while (pipe.IsConnected && !cTok.IsCancellationRequested)
{
byte[]? messageBytes = null;
try
{
messageBytes = await ReadMessageAsync(pipe, cTok);
}
catch (EndOfStreamException)
{
_logger.LogInformation($"Server: Client disconnected from pipe '{_pipeName}'.");
break;
}
catch (OperationCanceledException)
{
_logger.LogInformation($"Server: Message reading cancelled for pipe '{_pipeName}'.");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Server: Error reading message from pipe '{_pipeName}'.");
break;
}
if (messageBytes == null || messageBytes.Length == 0)
{
_logger.LogDebug($"Server: Received empty or null message from pipe '{_pipeName}'. Continuing...");
continue;
}
_logger.LogDebug($"Server: Received {messageBytes.Length} bytes from pipe '{_pipeName}'.");
Response response = await ProcessIncomingRequest(messageBytes);
bool writeSuccess = await WriteMessageAsync(pipe, PayloadHandler.Serialize(response), cTok);
if (!writeSuccess)
{
_logger.LogError($"Server: Failed to write response to pipe '{_pipeName}'.");
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Server: Unhandled exception in client connection handler for pipe '{_pipeName}'.");
}
finally
{
_logger.LogInformation($"Server: Client connection handler finished for pipe '{_pipeName}'. Disposing pipe.");
pipe.Dispose();
}
}
private async Task<Response> ProcessIncomingRequest(byte[] messageBytes)
{
Request request;
try
{
request = PayloadHandler.Deserialize<Request>(messageBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Server: Failed to deserialize incoming message as Request. Returning error response.");
return PayloadHandler.CreateErrorResponse(0, $"Failed to deserialize request: {ex.Message}");
}
if (request.Id > 0 && request.Type != CommandType.UnknownCommandType)
{
_logger.LogInformation("Server: Received request ID {RequestId}, Type: {CommandType}.", request.Id, request.Type);
if (_handler != null)
{
try
{
return await _handler(request);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Server: Error processing request ID {request.Id}, Type {request.Type} by handler.");
return PayloadHandler.CreateErrorResponse(request.Id, $"Handler error: {ex.Message}");
}
}
else
{
_logger.LogWarning("Server: No request handler registered for IPCServerService.");
return PayloadHandler.CreateErrorResponse(request.Id, "No handler registered on server.");
}
}
else
{
_logger.LogWarning("Server: Received invalid Protobuf request (ID or Type missing). Raw bytes: {Bytes}", BitConverter.ToString(messageBytes));
return PayloadHandler.CreateErrorResponse(request.Id, "Invalid request format.");
}
}
}
#else
public abstract class IPCBaseClientService : IPCService
{
private uint _id = 0;
private Response? _resp;
protected IPCBaseClientService(ILogger logger, string name)
: base(logger, name) { }
public async Task<Response?> GetResponseAsync()
{
if (_task == null)
return null;
await _task;
_pipe?.Dispose();
_pipe = null;
_task = null;
return _resp;
}
private async Task SendRequestAsync(NamedPipeClientStream pipe, Request request)
{
_pipe = pipe;
await pipe.ConnectAsync(5000);
Console.WriteLine("Connected to pipe!");
if (await WriteMessageAsync(pipe, PayloadHandler.Serialize(request), _cTok?.Token ?? CancellationToken.None))
{
_resp = PayloadHandler.Deserialize<Response>(await ReadMessageAsync(pipe, _cTok?.Token ?? CancellationToken.None));
}
else
{
throw new IOException("Failed to write request into named pipe");
}
}
public IPCBaseClientService SendRequest(int sessionId, Request request)
{
_task = SendRequestAsync(new NamedPipeClientStream(".", GetPipeName(sessionId), PipeDirection.InOut), request);
return this;
}
public IPCBaseClientService SendRequest<T>(int sessionId, CommandType type, T data) where T : IMessage<T>
{
return SendRequest(sessionId, PayloadHandler.CreateRequest(++_id, type, data));
}
}
#endif
}