/* This software is licensed by the MIT License, see LICENSE file */ /* Copyright © 2024-2025 Gregory Lirent */ #if WINIPC_UA_SERVER using Microsoft.Extensions.Options; using System.Diagnostics; using System.IO.Pipes; using WinIPC.Config; using WinIPC.Models; using WinIPC.Utils; namespace WinIPC.Services { public class IPCServerService : IHostedService, IDisposable { private readonly ILogger _logger; private readonly IPCServiceOptions _options; private readonly string _pipeName; private NamedPipeServerStream? _pipe; private CancellationTokenSource? _cTok; private Task? _task; private Func>? _handler; public IPCServerService(ILogger logger, IOptions appConfigOptions) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _options = appConfigOptions?.Value?.IPCService ?? throw new ArgumentNullException(nameof(appConfigOptions)); int sid = Process.GetCurrentProcess().SessionId; if (sid == 0) { _logger.LogWarning("Server: Failed to get current Session ID (returned 0). Using base pipe name without Session ID suffix."); _pipeName = _options.BasePipeName; } else { _pipeName = $"{_options.BasePipeName}.{sid}"; _logger.LogInformation("Server: Appending Session ID {SessionId} to pipe name. Actual pipe name: {ActualPipeName}", sid, _pipeName); } _logger.LogInformation("IPCServerService initialized. Actual pipe name: {ActualPipeName}", _pipeName); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("IPCServerService is starting..."); return Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("IPCServerService is stopping..."); _cTok?.Cancel(); if (_task != null) { try { await _task; } catch (OperationCanceledException) { /* Expected */ } catch (Exception ex) { _logger.LogError(ex, "Error while waiting for server listening task to complete."); } } Dispose(); _logger.LogInformation("IPCServerService stopped."); } public void Dispose() { _cTok?.Dispose(); _pipe?.Dispose(); GC.SuppressFinalize(this); } public Task StartListening(Func> handler, CancellationToken cTok) { _handler = handler ?? throw new ArgumentNullException(nameof(handler)); _cTok = CancellationTokenSource.CreateLinkedTokenSource(cTok); _task = Task.Run(() => ServerListenLoop(_cTok.Token), _cTok.Token); return _task; } private async Task ServerListenLoop(CancellationToken cTok) { _logger.LogInformation($"Server: Starting listen loop on pipe '{_pipeName}'..."); while (!cTok.IsCancellationRequested) { try { _pipe = new NamedPipeServerStream( _pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); _logger.LogInformation("Server: Waiting for client connection..."); await _pipe.WaitForConnectionAsync(cTok); _logger.LogInformation("Server: Client connected."); _ = HandleClientConnectionAsync(_pipe, cTok); } catch (OperationCanceledException) { _logger.LogInformation("Server: Listen loop cancelled."); _pipe?.Dispose(); break; } catch (Exception ex) { _logger.LogError(ex, "Server: Error in main listen loop. Disposing current pipe and continuing to next iteration."); _pipe?.Dispose(); await Task.Delay(1000, cTok); } } _logger.LogInformation("Server: Listen loop finished."); } private async Task HandleClientConnectionAsync(NamedPipeServerStream pipe, CancellationToken cTok) { try { _logger.LogInformation($"Server: Handling connection from pipe '{_pipeName}'..."); while (pipe.IsConnected && !cTok.IsCancellationRequested) { byte[]? messageBytes = null; try { messageBytes = await ReadMessageAsync(pipe, cTok); } catch (EndOfStreamException) { _logger.LogInformation($"Server: Client disconnected from pipe '{_pipeName}'."); break; } catch (OperationCanceledException) { _logger.LogInformation($"Server: Message reading cancelled for pipe '{_pipeName}'."); break; } catch (Exception ex) { _logger.LogError(ex, $"Server: Error reading message from pipe '{_pipeName}'."); break; } if (messageBytes == null || messageBytes.Length == 0) { _logger.LogDebug($"Server: Received empty or null message from pipe '{_pipeName}'. Continuing..."); continue; } _logger.LogDebug($"Server: Received {messageBytes.Length} bytes from pipe '{_pipeName}'."); Response response = await ProcessIncomingRequest(messageBytes); bool writeSuccess = await WriteMessageAsync(pipe, PayloadHandler.Serialize(response), cTok); if (!writeSuccess) { _logger.LogError($"Server: Failed to write response to pipe '{_pipeName}'."); break; } } } catch (Exception ex) { _logger.LogError(ex, $"Server: Unhandled exception in client connection handler for pipe '{_pipeName}'."); } finally { _logger.LogInformation($"Server: Client connection handler finished for pipe '{_pipeName}'. Disposing pipe."); pipe.Dispose(); } } private async Task ProcessIncomingRequest(byte[] messageBytes) { Request request; try { request = PayloadHandler.Deserialize(messageBytes); } catch (Exception ex) { _logger.LogError(ex, "Server: Failed to deserialize incoming message as Request. Returning error response."); return PayloadHandler.CreateErrorResponse(0, $"Failed to deserialize request: {ex.Message}"); } if (request.Id > 0 && request.Type != CommandType.UnknownCommandType) { _logger.LogInformation("Server: Received request ID {RequestId}, Type: {CommandType}.", request.Id, request.Type); if (_handler != null) { try { return await _handler(request); } catch (Exception ex) { _logger.LogError(ex, $"Server: Error processing request ID {request.Id}, Type {request.Type} by handler."); return PayloadHandler.CreateErrorResponse(request.Id, $"Handler error: {ex.Message}"); } } else { _logger.LogWarning("Server: No request handler registered for IPCServerService."); return PayloadHandler.CreateErrorResponse(request.Id, "No handler registered on server."); } } else { _logger.LogWarning("Server: Received invalid Protobuf request (ID or Type missing). Raw bytes: {Bytes}", BitConverter.ToString(messageBytes)); return PayloadHandler.CreateErrorResponse(request.Id, "Invalid request format."); } } private async Task ReadMessageAsync(PipeStream pipe, CancellationToken cTok) { byte[] lengthBuffer = new byte[4]; int bytesRead; bytesRead = await pipe.ReadAsync(lengthBuffer, 0, 4, cTok); if (bytesRead == 0) throw new EndOfStreamException("Pipe closed or no data for length prefix."); if (bytesRead < 4) throw new IOException($"Failed to read full 4-byte length prefix. Read {bytesRead} bytes."); int messageLength = BitConverter.ToInt32(lengthBuffer, 0); if (messageLength <= 0) throw new InvalidDataException($"Received invalid message length: {messageLength}."); byte[] messageBuffer = new byte[messageLength]; int totalBytesRead = 0; while (totalBytesRead < messageLength) { bytesRead = await pipe.ReadAsync(messageBuffer, totalBytesRead, messageLength - totalBytesRead, cTok); if (bytesRead == 0) throw new EndOfStreamException($"Pipe closed unexpectedly while reading message payload. Expected {messageLength} bytes, read {totalBytesRead}."); totalBytesRead += bytesRead; } return messageBuffer; } private async Task WriteMessageAsync(PipeStream pipe, byte[] buffer, CancellationToken cTok) { try { byte[] lengthPrefix = BitConverter.GetBytes(buffer.Length); await pipe.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cTok); await pipe.WriteAsync(buffer, 0, buffer.Length, cTok); await pipe.FlushAsync(cTok); return true; } catch (OperationCanceledException) { _logger.LogInformation("WriteMessageAsync: Operation cancelled."); return false; } catch (Exception ex) { _logger.LogError(ex, $"WriteMessageAsync: Error writing to pipe. Buffer size: {buffer.Length} bytes."); } return false; } } } #endif