/* This software is licensed by the MIT License, see LICENSE file */ /* Copyright © 2024-2025 Gregory Lirent */ #if !WINIPC_UA_SERVER using Google.Protobuf; #else using Microsoft.Extensions.Options; using System.Diagnostics; using WinIPC.Config; #endif using System.IO.Pipes; using WinIPC.Models; using WinIPC.Utils; namespace WinIPC.Services { public class IPCService : IHostedService, IDisposable { protected readonly ILogger _logger; protected string _name; protected Task? _task; protected CancellationTokenSource? _cTok; protected PipeStream? _pipe; protected string GetPipeName(int sessionId = 0) { if (sessionId == 0) return _name; return $"{_name}.{sessionId}"; } public Task StartAsync(CancellationToken cTok) { _logger.LogInformation("IPCService is starting..."); return Task.CompletedTask; } public async Task StopAsync(CancellationToken cTok) { _logger.LogInformation("IPCService is stopping..."); _cTok?.Cancel(); if (_task != null) { try { await _task; } catch (OperationCanceledException) { /* Expected */ } catch (Exception ex) { _logger.LogError(ex, "Error while waiting for server listening task to complete."); } } Dispose(); _logger.LogInformation("IPCService stopped."); } public void Dispose() { _cTok?.Dispose(); _pipe?.Dispose(); GC.SuppressFinalize(this); } protected IPCService(ILogger logger, string name = "") { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _name = name; } protected async Task ReadMessageAsync(PipeStream pipe, CancellationToken cTok) { byte[] lengthBuffer = new byte[4]; int bytesRead; bytesRead = await pipe.ReadAsync(lengthBuffer, 0, 4, cTok); if (bytesRead == 0) throw new EndOfStreamException("Pipe closed or no data for length prefix."); if (bytesRead < 4) throw new IOException($"Failed to read full 4-byte length prefix. Read {bytesRead} bytes."); int messageLength = BitConverter.ToInt32(lengthBuffer, 0); if (messageLength <= 0) throw new InvalidDataException($"Received invalid message length: {messageLength}."); byte[] messageBuffer = new byte[messageLength]; int totalBytesRead = 0; while (totalBytesRead < messageLength) { bytesRead = await pipe.ReadAsync(messageBuffer, totalBytesRead, messageLength - totalBytesRead, cTok); if (bytesRead == 0) throw new EndOfStreamException($"Pipe closed unexpectedly while reading message payload. Expected {messageLength} bytes, read {totalBytesRead}."); totalBytesRead += bytesRead; } return messageBuffer; } protected async Task WriteMessageAsync(PipeStream pipe, byte[] buffer, CancellationToken cTok) { try { byte[] lengthPrefix = BitConverter.GetBytes(buffer.Length); await pipe.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cTok); await pipe.WriteAsync(buffer, 0, buffer.Length, cTok); await pipe.FlushAsync(cTok); return true; } catch (OperationCanceledException) { _logger.LogInformation("WriteMessageAsync: Operation cancelled."); } catch (Exception ex) { _logger.LogError(ex, $"WriteMessageAsync: Error writing to pipe. Buffer size: {buffer.Length} bytes."); } return false; } } #if WINIPC_UA_SERVER public sealed class IPCServerService : IPCService { private Func>? _handler; private string _pipeName = String.Empty; public IPCServerService(ILogger logger, IOptions appConfigOptions) :base(logger, appConfigOptions?.Value?.IPCService.BasePipeName ?? throw new ArgumentNullException(nameof(appConfigOptions))) { } public Task StartListening(Func> handler, CancellationToken cTok) { _handler = handler ?? throw new ArgumentNullException(nameof(handler)); _cTok = CancellationTokenSource.CreateLinkedTokenSource(cTok); _task = Task.Run(() => Listen(_cTok.Token), _cTok.Token); return _task; } private async Task Listen(CancellationToken cTok) { _pipeName = GetPipeName(Process.GetCurrentProcess().SessionId); _logger.LogInformation($"Server: Starting listen loop on pipe '{_pipeName}'..."); while (!cTok.IsCancellationRequested) { try { var pipe = new NamedPipeServerStream( _pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); _pipe = pipe; _logger.LogInformation("Server: Waiting for client connection..."); await pipe.WaitForConnectionAsync(cTok); _logger.LogInformation("Server: Client connected."); _ = HandleClientConnectionAsync(pipe, cTok); } catch (OperationCanceledException) { _logger.LogInformation("Server: Listen loop cancelled."); _pipe?.Dispose(); break; } catch (Exception ex) { _logger.LogError(ex, "Server: Error in main listen loop. Disposing current pipe and continuing to next iteration."); _pipe?.Dispose(); await Task.Delay(1000, cTok); } } _logger.LogInformation("Server: Listen loop finished."); } private async Task HandleClientConnectionAsync(NamedPipeServerStream pipe, CancellationToken cTok) { try { _logger.LogInformation($"Server: Handling connection from pipe '{_pipeName}'..."); while (pipe.IsConnected && !cTok.IsCancellationRequested) { byte[]? messageBytes = null; try { messageBytes = await ReadMessageAsync(pipe, cTok); } catch (EndOfStreamException) { _logger.LogInformation($"Server: Client disconnected from pipe '{_pipeName}'."); break; } catch (OperationCanceledException) { _logger.LogInformation($"Server: Message reading cancelled for pipe '{_pipeName}'."); break; } catch (Exception ex) { _logger.LogError(ex, $"Server: Error reading message from pipe '{_pipeName}'."); break; } if (messageBytes == null || messageBytes.Length == 0) { _logger.LogDebug($"Server: Received empty or null message from pipe '{_pipeName}'. Continuing..."); continue; } _logger.LogDebug($"Server: Received {messageBytes.Length} bytes from pipe '{_pipeName}'."); Response response = await ProcessIncomingRequest(messageBytes); bool writeSuccess = await WriteMessageAsync(pipe, PayloadHandler.Serialize(response), cTok); if (!writeSuccess) { _logger.LogError($"Server: Failed to write response to pipe '{_pipeName}'."); break; } } } catch (Exception ex) { _logger.LogError(ex, $"Server: Unhandled exception in client connection handler for pipe '{_pipeName}'."); } finally { _logger.LogInformation($"Server: Client connection handler finished for pipe '{_pipeName}'. Disposing pipe."); pipe.Dispose(); } } private async Task ProcessIncomingRequest(byte[] messageBytes) { Request request; try { request = PayloadHandler.Deserialize(messageBytes); } catch (Exception ex) { _logger.LogError(ex, "Server: Failed to deserialize incoming message as Request. Returning error response."); return PayloadHandler.CreateErrorResponse(0, $"Failed to deserialize request: {ex.Message}"); } if (request.Id > 0 && request.Type != CommandType.UnknownCommandType) { _logger.LogInformation("Server: Received request ID {RequestId}, Type: {CommandType}.", request.Id, request.Type); if (_handler != null) { try { return await _handler(request); } catch (Exception ex) { _logger.LogError(ex, $"Server: Error processing request ID {request.Id}, Type {request.Type} by handler."); return PayloadHandler.CreateErrorResponse(request.Id, $"Handler error: {ex.Message}"); } } else { _logger.LogWarning("Server: No request handler registered for IPCServerService."); return PayloadHandler.CreateErrorResponse(request.Id, "No handler registered on server."); } } else { _logger.LogWarning("Server: Received invalid Protobuf request (ID or Type missing). Raw bytes: {Bytes}", BitConverter.ToString(messageBytes)); return PayloadHandler.CreateErrorResponse(request.Id, "Invalid request format."); } } } #else public abstract class IPCBaseClientService : IPCService { private uint _id = 0; private Response? _resp; protected IPCBaseClientService(ILogger logger, string name) : base(logger, name) { } public async Task GetResponseAsync() { if (_task == null) return null; await _task; _pipe?.Dispose(); _pipe = null; _task = null; return _resp; } private async Task SendRequestAsync(NamedPipeClientStream pipe, Request request) { _pipe = pipe; await pipe.ConnectAsync(5000); Console.WriteLine("Connected to pipe!"); if (await WriteMessageAsync(pipe, PayloadHandler.Serialize(request), _cTok?.Token ?? CancellationToken.None)) { _resp = PayloadHandler.Deserialize(await ReadMessageAsync(pipe, _cTok?.Token ?? CancellationToken.None)); } else { throw new IOException("Failed to write request into named pipe"); } } public IPCBaseClientService SendRequest(int sessionId, Request request) { _task = SendRequestAsync(new NamedPipeClientStream(".", GetPipeName(sessionId), PipeDirection.InOut), request); return this; } public IPCBaseClientService SendRequest(int sessionId, CommandType type, T data) where T : IMessage { return SendRequest(sessionId, PayloadHandler.CreateRequest(++_id, type, data)); } } #endif }