SiLA2.IPC.NetMQ
10.2.2
dotnet add package SiLA2.IPC.NetMQ --version 10.2.2
NuGet\Install-Package SiLA2.IPC.NetMQ -Version 10.2.2
<PackageReference Include="SiLA2.IPC.NetMQ" Version="10.2.2" />
<PackageVersion Include="SiLA2.IPC.NetMQ" Version="10.2.2" />
<PackageReference Include="SiLA2.IPC.NetMQ" />
paket add SiLA2.IPC.NetMQ --version 10.2.2
#r "nuget: SiLA2.IPC.NetMQ, 10.2.2"
#:package SiLA2.IPC.NetMQ@10.2.2
#addin nuget:?package=SiLA2.IPC.NetMQ&version=10.2.2
#tool nuget:?package=SiLA2.IPC.NetMQ&version=10.2.2
SiLA2.IPC.NetMQ
Inter-Process Communication via ZeroMQ for SiLA2 .NET Implementation
| NuGet Package | SiLA2.IPC.NetMQ on NuGet.org |
| Repository | https://gitlab.com/SiLA2/sila_csharp |
| ZeroMQ Homepage | https://zeromq.org |
| NetMQ (C# Port) | https://github.com/zeromq/netmq |
| SiLA Standard | https://sila-standard.com |
| License | MIT |
Overview
SiLA2.IPC.NetMQ is an optional module for the SiLA2 .NET implementation that provides lightweight, high-performance inter-process communication (IPC) using ZeroMQ/NetMQ. This module enables SiLA2 servers to communicate with other processes, microservices, or distributed components using the proven Request/Reply messaging pattern.
This library is particularly useful for:
- Distributed SiLA2 architectures where controllers coordinate worker processes
- Microservices communication between SiLA2 components
- Process isolation for safety-critical operations
- Message passing between laboratory automation systems
- Command distribution to worker processes or remote devices
Key Features
- Request/Reply Pattern - Synchronous message exchange with guaranteed delivery
- Multiple Transports - TCP (network), IPC (local processes), inproc (threads)
- Simple API - Clean interface abstractions for server and client
- Custom Message Processing - Configurable request handlers with full control
- Single and Multipart Messages - Support for simple and complex data structures
- Timeout Control - Configurable response timeouts with exception handling
- Fire-and-Forget Mode - Optional one-way messaging without waiting for responses
- Automatic Resource Management - Proper socket disposal and cleanup
- Logging Integration - Microsoft.Extensions.Logging support
Relationship to SiLA2.Core
SiLA2.IPC.NetMQ is an optional module that complements the primary gRPC-based communication in SiLA2.Core. While gRPC handles client-server communication for the SiLA2 protocol, NetMQ is ideal for:
- Sidecar communication - Coordinating with helper processes
- Internal messaging - Communication between server components
- Lightweight IPC - Faster local process communication than gRPC
- Pub/Sub scenarios - Future support for broadcast patterns (not yet implemented)
- Legacy integration - Connecting to ZeroMQ-based systems
When to use NetMQ vs gRPC:
- Use gRPC for: SiLA2 client-server communication, feature implementation, network protocols
- Use NetMQ for: Process-to-process messaging, microservices coordination, command distribution
Introduction to ZeroMQ
ZeroMQ (also written as ØMQ or 0MQ) is a high-performance asynchronous messaging library that provides socket-like abstractions for common messaging patterns. Unlike traditional message brokers (RabbitMQ, Kafka), ZeroMQ is:
- Brokerless - No central message broker, direct peer-to-peer communication
- Fast - Minimal overhead, optimized for low latency and high throughput
- Simple - Socket-like API that feels familiar to network programmers
- Flexible - Works across threads, processes, and machines
- Polyglot - Supports 40+ programming languages with identical semantics
NetMQ is the native C# port of ZeroMQ, providing the same capabilities without requiring native C++ libraries.
Request/Reply Pattern
This module implements ZeroMQ's Request/Reply (REQ/REP) pattern, which provides:
Client (REQ) Server (REP)
| |
|---- Request ------> |
| | (Process request)
| <---- Response ---- |
| |
Characteristics:
- Strict alternation - Client sends, then receives; server receives, then sends
- Synchronous - Client blocks waiting for response
- Reliable - Messages are guaranteed to arrive or fail with exception
- Simple - Natural request/response workflow
Transport Types
ZeroMQ supports multiple transport protocols:
| Transport | Format | Use Case | Example |
|---|---|---|---|
| TCP | tcp://host:port |
Network communication | tcp://192.168.1.100:5555 |
| IPC | ipc://path |
Local process communication (Unix) | ipc:///tmp/myservice |
| Inproc | inproc://name |
In-process thread communication | inproc://workers |
Note: IPC transport is not available on Windows. Use TCP with localhost for local communication on Windows.
Installation
Install via NuGet Package Manager:
dotnet add package SiLA2.IPC.NetMQ
Or via Package Manager Console:
Install-Package SiLA2.IPC.NetMQ
Prerequisites
- .NET 10.0 or later
- NetMQ 4.0.2.2+ (automatically installed as dependency)
- Microsoft.Extensions.Logging 10.0.2+ (for logging support)
Quick Start
Get up and running with NetMQ in 3 minutes:
1. Create an Echo Server
using SiLA2.IPC.NetMQ;
using Microsoft.Extensions.Logging;
// Setup logging
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var serverLogger = loggerFactory.CreateLogger<ZeroMqServer>();
// Configure server (binds to all interfaces on port 5555)
var serverConfig = new ZeroMqServerConfig("tcp://*:5555");
// Create and start server
var server = new ZeroMqServer(serverConfig, serverLogger);
// Run server in background
var cts = new CancellationTokenSource();
var serverTask = Task.Run(() => server.Run(cts.Token));
Console.WriteLine("Echo server running on tcp://*:5555");
Console.WriteLine("Press Enter to stop...");
Console.ReadLine();
// Stop server gracefully
cts.Cancel();
await serverTask;
2. Create a Client
using SiLA2.IPC.NetMQ;
using Microsoft.Extensions.Logging;
// Setup logging
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var clientLogger = loggerFactory.CreateLogger<ZeroMqClient>();
// Configure client (connect to server)
var clientConfig = new ZeroMqClientConfig(
serverSocketAddress: "tcp://localhost:5555",
timeout: TimeSpan.FromSeconds(5)
);
// Create client
var client = new ZeroMqClient(clientConfig, clientLogger);
// Send message and receive response
var response = client.Send("Hello, ZeroMQ!");
Console.WriteLine($"Server response: {response}");
// Output: Server response: Message from Server : 'Hello, ZeroMQ!'
That's it! You now have a working client-server communication system.
Core Components
Architecture Overview
┌──────────────────────────────────────────────────┐
│ SiLA2.IPC.NetMQ │
├──────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ IZeroMq │ │ IZeroMq │ │
│ │ Server │◄─────────────┤ Client │ │
│ └──────────────┘ Request └──────────────┘ │
│ │ /Response │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ ZeroMqServer│ │ ZeroMqClient │ │
│ │ (ResponseSkt)│ │ (RequestSkt) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │IZeroMqServer │ │IZeroMqClient │ │
│ │ Config │ │ Config │ │
│ └──────────────┘ └──────────────┘ │
│ │
└──────────────────────────────────────────────────┘
│
▼
┌──────────────────┐
│ NetMQ │
│ (ZeroMQ Port) │
└──────────────────┘
Interfaces
IZeroMqServer
High-level server abstraction for handling client requests.
public interface IZeroMqServer
{
// Start server and process requests until cancellation
Task Run(CancellationToken cancellationToken = default);
// Set custom request processing logic at runtime
void SetRequestProcessing(Func<NetMQMessage, string> requestProcessing);
}
IZeroMqClient
High-level client abstraction for sending messages to servers.
public interface IZeroMqClient
{
// Get client configuration
IZeroMqClientConfig ZeroMqClientConfig { get; }
// Send single-frame string message
string Send(string message);
// Send multipart NetMQ message
string Send(NetMQMessage netMQMessage);
}
IZeroMqServerConfig
Configuration for server socket binding and message processing.
public interface IZeroMqServerConfig
{
// Server binding address (e.g., "tcp://*:5555")
string ServerSocketAddress { get; }
// Custom request processing delegate
Func<NetMQMessage, string> ProcessRequest { get; set; }
}
IZeroMqClientConfig
Configuration for client connections and behavior.
public interface IZeroMqClientConfig
{
// Server connection address
string ServerSocketAddress { get; }
// Response timeout
TimeSpan Timeout { get; }
// Use single-frame messages (vs multipart)
bool UseSingleFrame { get; set; }
// Wait for server response (vs fire-and-forget)
bool ExpectServerResponse { get; set; }
}
Implementations
- ZeroMqServer - Production-ready server using NetMQ's ResponseSocket
- ZeroMqClient - Production-ready client using NetMQ's RequestSocket
- ZeroMqServerConfig - Default server configuration with echo behavior
- ZeroMqClientConfig - Default client configuration
Server Configuration
Creating a Server with Default Echo Behavior
using SiLA2.IPC.NetMQ;
using Microsoft.Extensions.Logging;
var logger = loggerFactory.CreateLogger<ZeroMqServer>();
// Server echoes received messages back to clients
var config = new ZeroMqServerConfig("tcp://*:5555");
var server = new ZeroMqServer(config, logger);
var cts = new CancellationTokenSource();
await server.Run(cts.Token);
Creating a Server with Custom Request Processing
using SiLA2.IPC.NetMQ;
using System.Text.Json;
// Define custom request handler
Func<NetMQMessage, string> customHandler = (message) =>
{
// Extract message content
var frame = message[0];
var requestText = frame.ConvertToString();
// Process request (example: uppercase transformation)
var response = requestText.ToUpper();
return response;
};
// Create server with custom handler
var config = new ZeroMqServerConfig("tcp://*:5555", customHandler);
var server = new ZeroMqServer(config, logger);
// Or set handler after creation
server.SetRequestProcessing(customHandler);
await server.Run(CancellationToken.None);
Socket Address Formats
TCP (Network Communication):
// Bind to all interfaces on port 5555
var config = new ZeroMqServerConfig("tcp://*:5555");
// Bind to specific interface
var config = new ZeroMqServerConfig("tcp://192.168.1.100:5555");
// Bind to localhost only
var config = new ZeroMqServerConfig("tcp://127.0.0.1:5555");
IPC (Unix/Linux Local Processes):
// Unix domain socket (Linux/macOS only)
var config = new ZeroMqServerConfig("ipc:///tmp/myservice");
// Note: IPC not available on Windows - use TCP with localhost instead
Inproc (In-Process Threading):
// Communication between threads in same process
var config = new ZeroMqServerConfig("inproc://workers");
Running the Server with Cancellation
// Run server in background with cancellation support
var cts = new CancellationTokenSource();
var serverTask = Task.Run(async () =>
{
try
{
await server.Run(cts.Token);
}
catch (Exception ex)
{
Console.WriteLine($"Server error: {ex.Message}");
}
});
// Stop after 60 seconds
await Task.Delay(TimeSpan.FromSeconds(60));
cts.Cancel();
// Wait for graceful shutdown
await serverTask;
Console.WriteLine("Server stopped");
Client Configuration
Creating a Client
using SiLA2.IPC.NetMQ;
var logger = loggerFactory.CreateLogger<ZeroMqClient>();
// Configure client with 5-second timeout
var config = new ZeroMqClientConfig(
serverSocketAddress: "tcp://localhost:5555",
timeout: TimeSpan.FromSeconds(5)
);
var client = new ZeroMqClient(config, logger);
// Send message
var response = client.Send("Hello from client");
Console.WriteLine($"Response: {response}");
Timeout Configuration
// Short timeout for low-latency systems
var config = new ZeroMqClientConfig(
"tcp://localhost:5555",
timeout: TimeSpan.FromMilliseconds(500)
);
// Long timeout for slow operations
var config = new ZeroMqClientConfig(
"tcp://localhost:5555",
timeout: TimeSpan.FromSeconds(30)
);
// Send with timeout handling
try
{
var response = client.Send("Request");
}
catch (NetMQException ex)
{
Console.WriteLine($"Timeout or error: {ex.Message}");
}
Single-Frame vs Multipart Messages
Single-Frame (Default):
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5))
{
UseSingleFrame = true // Default
};
var client = new ZeroMqClient(config, logger);
// Sends as single frame
var response = client.Send("Simple message");
Multipart Messages:
using NetMQ;
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5))
{
UseSingleFrame = false
};
var client = new ZeroMqClient(config, logger);
// Create multipart message
var message = new NetMQMessage();
message.Append("Header");
message.Append("Body");
message.Append("Footer");
var response = client.Send(message);
Synchronous Request/Response Pattern
// Standard synchronous pattern (default)
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5))
{
ExpectServerResponse = true // Default
};
var client = new ZeroMqClient(config, logger);
// Blocks until response received or timeout
var response = client.Send("Request");
Console.WriteLine($"Got response: {response}");
Usage Examples
Example 1: Echo Server and Client
Server:
using SiLA2.IPC.NetMQ;
using Microsoft.Extensions.Logging;
class Program
{
static async Task Main(string[] args)
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<ZeroMqServer>();
// Echo server - returns messages unchanged
var config = new ZeroMqServerConfig("tcp://*:5555");
var server = new ZeroMqServer(config, logger);
Console.WriteLine("Echo server started on tcp://*:5555");
Console.WriteLine("Press Ctrl+C to stop");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
};
await server.Run(cts.Token);
}
}
Client:
using SiLA2.IPC.NetMQ;
using Microsoft.Extensions.Logging;
class Program
{
static void Main(string[] args)
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<ZeroMqClient>();
var config = new ZeroMqClientConfig(
"tcp://localhost:5555",
TimeSpan.FromSeconds(5));
var client = new ZeroMqClient(config, logger);
// Interactive echo client
while (true)
{
Console.Write("Enter message (or 'quit'): ");
var input = Console.ReadLine();
if (input?.ToLower() == "quit")
break;
try
{
var response = client.Send(input);
Console.WriteLine($"Server: {response}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
}
Example 2: Custom Message Processing (JSON)
Server with JSON Processing:
using SiLA2.IPC.NetMQ;
using System.Text.Json;
// Define request and response models
public class CommandRequest
{
public string Command { get; set; }
public Dictionary<string, string> Parameters { get; set; }
}
public class CommandResponse
{
public bool Success { get; set; }
public string Result { get; set; }
public string Error { get; set; }
}
class Program
{
static async Task Main(string[] args)
{
var logger = loggerFactory.CreateLogger<ZeroMqServer>();
// Custom JSON request processor
Func<NetMQMessage, string> jsonProcessor = (message) =>
{
try
{
// Parse JSON request
var requestJson = message[0].ConvertToString();
var request = JsonSerializer.Deserialize<CommandRequest>(requestJson);
// Process command
var result = ExecuteCommand(request.Command, request.Parameters);
// Return JSON response
var response = new CommandResponse
{
Success = true,
Result = result
};
return JsonSerializer.Serialize(response);
}
catch (Exception ex)
{
var errorResponse = new CommandResponse
{
Success = false,
Error = ex.Message
};
return JsonSerializer.Serialize(errorResponse);
}
};
var config = new ZeroMqServerConfig("tcp://*:5555", jsonProcessor);
var server = new ZeroMqServer(config, logger);
await server.Run(CancellationToken.None);
}
static string ExecuteCommand(string command, Dictionary<string, string> parameters)
{
return command switch
{
"GetStatus" => "System is running",
"GetTemperature" => "25.5°C",
"SetParameter" => $"Set {parameters["name"]} = {parameters["value"]}",
_ => "Unknown command"
};
}
}
Client with JSON:
using SiLA2.IPC.NetMQ;
using System.Text.Json;
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5));
var client = new ZeroMqClient(config, logger);
// Send JSON request
var request = new CommandRequest
{
Command = "GetTemperature",
Parameters = new Dictionary<string, string>()
};
var requestJson = JsonSerializer.Serialize(request);
var responseJson = client.Send(requestJson);
// Parse JSON response
var response = JsonSerializer.Deserialize<CommandResponse>(responseJson);
Console.WriteLine($"Success: {response.Success}, Result: {response.Result}");
Example 3: Multipart Message Handling
Server Processing Multipart:
Func<NetMQMessage, string> multipartProcessor = (message) =>
{
var parts = new List<string>();
foreach (var frame in message)
{
parts.Add(frame.ConvertToString());
}
return $"Received {parts.Count} parts: {string.Join(", ", parts)}";
};
var config = new ZeroMqServerConfig("tcp://*:5555", multipartProcessor);
var server = new ZeroMqServer(config, logger);
await server.Run(CancellationToken.None);
Client Sending Multipart:
using NetMQ;
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5));
var client = new ZeroMqClient(config, logger);
// Create multipart message
var message = new NetMQMessage();
message.Append("CommandType");
message.Append("Payload1");
message.Append("Payload2");
var response = client.Send(message);
Console.WriteLine(response);
// Output: Received 3 parts: CommandType, Payload1, Payload2
Example 4: Fire-and-Forget Messaging
// Configure client for one-way messaging (no response expected)
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5))
{
ExpectServerResponse = false // Don't wait for response
};
var client = new ZeroMqClient(config, logger);
// Send message without blocking
var response = client.Send("Fire and forget message");
// response will be empty string
Console.WriteLine("Message sent, continuing immediately");
Note: Fire-and-forget mode violates the strict REQ/REP alternation pattern. Use with caution or consider other ZeroMQ patterns (PUSH/PULL, PUB/SUB) for one-way messaging.
Example 5: Error Handling Patterns
using NetMQ;
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5));
var client = new ZeroMqClient(config, logger);
try
{
var response = client.Send("Request");
Console.WriteLine($"Success: {response}");
}
catch (NetMQException ex) when (ex.Message.Contains("Failed to receive"))
{
// Timeout - server didn't respond
Console.WriteLine("Server timeout - no response received");
}
catch (NetMQException ex)
{
// Other NetMQ errors (connection refused, etc.)
Console.WriteLine($"Communication error: {ex.Message}");
}
catch (Exception ex)
{
// Unexpected errors
Console.WriteLine($"Unexpected error: {ex.Message}");
}
Example 6: Graceful Shutdown Pattern
using SiLA2.IPC.NetMQ;
using System.Threading;
class ServerHost
{
private readonly ZeroMqServer _server;
private readonly CancellationTokenSource _cts;
public ServerHost(ZeroMqServer server)
{
_server = server;
_cts = new CancellationTokenSource();
}
public async Task StartAsync()
{
// Start server in background
var serverTask = Task.Run(async () =>
{
try
{
await _server.Run(_cts.Token);
}
catch (Exception ex)
{
Console.WriteLine($"Server error: {ex.Message}");
}
});
// Wait for startup
await Task.Delay(100);
Console.WriteLine("Server started");
}
public async Task StopAsync()
{
Console.WriteLine("Stopping server...");
// Cancel server operation
_cts.Cancel();
// Wait for graceful shutdown
await Task.Delay(500);
Console.WriteLine("Server stopped");
}
}
// Usage
var config = new ZeroMqServerConfig("tcp://*:5555");
var server = new ZeroMqServer(config, logger);
var host = new ServerHost(server);
await host.StartAsync();
// Run for some time
await Task.Delay(TimeSpan.FromMinutes(5));
await host.StopAsync();
Transport Types
TCP Transport (Network Communication)
Best for: Communication over networks, remote servers, distributed systems
Server:
// Bind to all interfaces
var config = new ZeroMqServerConfig("tcp://*:5555");
// Bind to specific interface
var config = new ZeroMqServerConfig("tcp://192.168.1.100:5555");
// Localhost only (development)
var config = new ZeroMqServerConfig("tcp://127.0.0.1:5555");
Client:
// Connect to remote server
var config = new ZeroMqClientConfig("tcp://192.168.1.100:5555", TimeSpan.FromSeconds(5));
// Connect to localhost
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5));
Advantages:
- Works across machines
- Firewall-friendly (configurable ports)
- Platform-independent
Considerations:
- Requires network connectivity
- Subject to network latency
- Firewall configuration may be needed
IPC Transport (Local Process Communication)
Best for: High-performance communication between processes on same machine (Unix/Linux)
Server:
// Unix domain socket
var config = new ZeroMqServerConfig("ipc:///tmp/myservice");
// Or in application directory
var config = new ZeroMqServerConfig("ipc:///var/run/sila2/service.ipc");
Client:
var config = new ZeroMqClientConfig("ipc:///tmp/myservice", TimeSpan.FromSeconds(5));
Advantages:
- Faster than TCP (no network stack overhead)
- File-system based permissions
- No port conflicts
Limitations:
- Not available on Windows (use TCP with localhost instead)
- Limited to same machine
- Requires file system path access
Windows Alternative:
// Windows: Use TCP with localhost for local IPC
var config = new ZeroMqServerConfig("tcp://127.0.0.1:5555");
When to Use Each Transport
| Scenario | Recommended Transport | Example |
|---|---|---|
| Same machine (Linux) | IPC | ipc:///tmp/service |
| Same machine (Windows) | TCP localhost | tcp://127.0.0.1:5555 |
| Local network | TCP | tcp://192.168.1.100:5555 |
| Internet | TCP (with TLS proxy) | tcp://server.example.com:5555 |
| Microservices (containers) | TCP | tcp://service-name:5555 |
Integration with SiLA2 Servers
Using IPC for Microservices Communication
Scenario: A SiLA2 temperature controller delegates computation-heavy operations to a worker process.
Controller Server (Main SiLA2 Server):
using SiLA2.IPC.NetMQ;
using SiLA2.AspNetCore;
public class TemperatureFeatureService : TemperatureController.TemperatureControllerBase
{
private readonly ISiLA2Server _siLA2Server;
private readonly ZeroMqClient _workerClient;
private readonly ILogger<TemperatureFeatureService> _logger;
public TemperatureFeatureService(
ISiLA2Server siLA2Server,
ILogger<TemperatureFeatureService> logger)
{
_siLA2Server = siLA2Server;
_logger = logger;
// Setup IPC client to worker process
var clientConfig = new ZeroMqClientConfig(
"tcp://localhost:5556", // Worker process endpoint
TimeSpan.FromSeconds(10));
var clientLogger = LoggerFactory.Create(b => b.AddConsole())
.CreateLogger<ZeroMqClient>();
_workerClient = new ZeroMqClient(clientConfig, clientLogger);
}
public override async Task<SetTemperature_Responses> SetTemperature(
SetTemperature_Parameters request,
ServerCallContext context)
{
// Delegate complex calculation to worker process via IPC
var workerRequest = new
{
Command = "CalculatePID",
TargetTemp = request.TargetTemperature.Value,
CurrentTemp = GetCurrentTemperature()
};
var requestJson = JsonSerializer.Serialize(workerRequest);
try
{
var responseJson = _workerClient.Send(requestJson);
var workerResponse = JsonSerializer.Deserialize<WorkerResponse>(responseJson);
// Apply PID output from worker
ApplyHeatingPower(workerResponse.Output);
return new SetTemperature_Responses
{
EmptyResponse = new Empty()
};
}
catch (NetMQException ex)
{
_logger.LogError($"Worker communication failed: {ex.Message}");
throw new RpcException(new Status(StatusCode.Internal, "Worker unavailable"));
}
}
private double GetCurrentTemperature() => 25.0; // Stub
private void ApplyHeatingPower(double power) { } // Stub
}
public class WorkerResponse
{
public double Output { get; set; }
}
Worker Process (Calculation Engine):
using SiLA2.IPC.NetMQ;
using System.Text.Json;
class WorkerProgram
{
static async Task Main(string[] args)
{
var logger = LoggerFactory.Create(b => b.AddConsole())
.CreateLogger<ZeroMqServer>();
// Worker request processor
Func<NetMQMessage, string> processor = (message) =>
{
var requestJson = message[0].ConvertToString();
var request = JsonSerializer.Deserialize<WorkerRequest>(requestJson);
double output = request.Command switch
{
"CalculatePID" => CalculatePID(request.TargetTemp, request.CurrentTemp),
_ => 0.0
};
var response = new { Output = output };
return JsonSerializer.Serialize(response);
};
var config = new ZeroMqServerConfig("tcp://*:5556", processor);
var server = new ZeroMqServer(config, logger);
Console.WriteLine("Worker process started on tcp://*:5556");
await server.Run(CancellationToken.None);
}
static double CalculatePID(double target, double current)
{
// PID controller logic
var error = target - current;
return error * 0.5; // Simplified P-only controller
}
}
public class WorkerRequest
{
public string Command { get; set; }
public double TargetTemp { get; set; }
public double CurrentTemp { get; set; }
}
Connecting Multiple SiLA2 Server Processes
Coordinator Pattern:
// Main SiLA2 Server coordinates multiple device servers via IPC
public class CoordinatorService
{
private readonly Dictionary<string, ZeroMqClient> _deviceClients;
public CoordinatorService()
{
_deviceClients = new Dictionary<string, ZeroMqClient>
{
["Device1"] = CreateClient("tcp://localhost:5561"),
["Device2"] = CreateClient("tcp://localhost:5562"),
["Device3"] = CreateClient("tcp://localhost:5563")
};
}
private ZeroMqClient CreateClient(string address)
{
var config = new ZeroMqClientConfig(address, TimeSpan.FromSeconds(5));
var logger = LoggerFactory.Create(b => b.AddConsole())
.CreateLogger<ZeroMqClient>();
return new ZeroMqClient(config, logger);
}
public async Task<string> ExecuteParallelCommands(string command)
{
var tasks = _deviceClients.Select(kvp =>
Task.Run(() =>
{
try
{
var response = kvp.Value.Send(command);
return $"{kvp.Key}: {response}";
}
catch (Exception ex)
{
return $"{kvp.Key}: Error - {ex.Message}";
}
})
);
var results = await Task.WhenAll(tasks);
return string.Join("\n", results);
}
}
Example Scenario: Controller → Worker Communication
┌───────────────────────────────────────────────────────────┐
│ SiLA2 Temperature Server │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ TemperatureFeatureService (gRPC) │ │
│ │ │ │
│ │ - Handles SiLA2 client requests │ │
│ │ - Delegates calculations to worker │ │
│ └─────────────────┬──────────────────────────────────┘ │
│ │ IPC via NetMQ │
│ │ (tcp://localhost:5556) │
└────────────────────┼──────────────────────────────────────┘
│
▼
┌──────────────────────┐
│ Worker Process │
│ │
│ - PID calculation │
│ - Heavy compute │
│ - Isolated logic │
└──────────────────────┘
Benefits:
- Process isolation for safety-critical code
- Independent restarts without affecting main server
- Dedicated resource allocation
- Language-agnostic workers (Python, C++, etc.)
Advanced Usage
Custom Request Processing Delegates
Example: Protocol Buffer Serialization
using Google.Protobuf;
// Assuming you have protobuf message types
Func<NetMQMessage, string> protobufProcessor = (message) =>
{
var bytes = message[0].ToByteArray();
// Deserialize protobuf request
var request = MyRequest.Parser.ParseFrom(bytes);
// Process request
var response = new MyResponse
{
Success = true,
Result = ProcessRequest(request)
};
// Serialize response to JSON (or return bytes as base64)
return JsonSerializer.Serialize(response);
};
var config = new ZeroMqServerConfig("tcp://*:5555", protobufProcessor);
var server = new ZeroMqServer(config, logger);
Message Serialization Patterns
Binary Serialization:
using System.Runtime.Serialization.Formatters.Binary;
// Server: Deserialize binary data
Func<NetMQMessage, string> binaryProcessor = (message) =>
{
var bytes = message[0].ToByteArray();
using var ms = new MemoryStream(bytes);
var formatter = new BinaryFormatter();
var obj = formatter.Deserialize(ms);
// Process object...
return "Processed";
};
MessagePack Serialization:
using MessagePack;
// Server: MessagePack deserialization
Func<NetMQMessage, string> msgPackProcessor = (message) =>
{
var bytes = message[0].ToByteArray();
var request = MessagePackSerializer.Deserialize<MyRequest>(bytes);
// Process request...
var response = new MyResponse { Success = true };
var responseBytes = MessagePackSerializer.Serialize(response);
return Convert.ToBase64String(responseBytes);
};
Timeout Handling and Retry Logic
using Polly;
var retryPolicy = Policy
.Handle<NetMQException>()
.WaitAndRetry(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)));
var config = new ZeroMqClientConfig("tcp://localhost:5555", TimeSpan.FromSeconds(5));
var client = new ZeroMqClient(config, logger);
string response = retryPolicy.Execute(() =>
{
return client.Send("Important request");
});
Performance Considerations
Connection Pooling:
// Bad: Creating new client for each request (expensive)
for (int i = 0; i < 1000; i++)
{
var client = new ZeroMqClient(config, logger);
client.Send($"Request {i}");
}
// Good: Reuse client instance
var client = new ZeroMqClient(config, logger);
for (int i = 0; i < 1000; i++)
{
client.Send($"Request {i}");
}
Message Size:
- ZeroMQ handles large messages efficiently
- Consider chunking for messages > 10MB
- Use multipart messages for structured data
Throughput Optimization:
- Increase socket buffer sizes (if needed)
- Use batch processing for high-frequency messages
- Consider ZeroMQ's DEALER/ROUTER for parallel processing
Security Considerations
Transport Encryption:
NetMQ supports ZeroMQ's CurveZMQ encryption (not implemented in this wrapper). For production:
// Option 1: Use TCP over SSH tunnel
// ssh -L 5555:localhost:5555 user@server
// Option 2: Use reverse proxy with TLS (nginx, Envoy)
// nginx -> tcp://backend:5555
// Option 3: VPN for network-level encryption
Access Control:
// Server-side validation
Func<NetMQMessage, string> secureProcessor = (message) =>
{
var request = message[0].ConvertToString();
// Validate request format
if (!IsValidRequest(request))
return "Error: Invalid request";
// Process valid request
return ProcessRequest(request);
};
API Reference
Key Interface Methods
IZeroMqServer
// Start server and process requests
Task Run(CancellationToken cancellationToken = default)
// - Binds to configured address
// - Continuously processes incoming messages
// - Returns when cancellation requested
// Set custom request processor
void SetRequestProcessing(Func<NetMQMessage, string> requestProcessing)
// - Updates the message processing delegate
// - Takes effect for all subsequent messages
IZeroMqClient
// Send single-frame string message
string Send(string message)
// - Creates RequestSocket
// - Connects to server
// - Sends message as single frame
// - Optionally waits for response
// - Returns response or empty string
// Send multipart message
string Send(NetMQMessage netMQMessage)
// - Creates RequestSocket
// - Connects to server
// - Sends multipart message
// - Optionally waits for response
// - Returns response or empty string
Configuration Properties
ZeroMqServerConfig
| Property | Type | Description |
|---|---|---|
ServerSocketAddress |
string |
Binding address (e.g., "tcp://*:5555") |
ProcessRequest |
Func<NetMQMessage, string> |
Request processing delegate |
ZeroMqClientConfig
| Property | Type | Description | Default |
|---|---|---|---|
ServerSocketAddress |
string |
Server connection address | Required |
Timeout |
TimeSpan |
Response timeout | Required |
UseSingleFrame |
bool |
Use single-frame messages | true |
ExpectServerResponse |
bool |
Wait for server response | true |
Message Flow Diagram
Client Server
│ │
│ 1. Create RequestSocket │
│ 2. Connect(address) │ 1. Create ResponseSocket
│ │ 2. Bind(address)
│ │ 3. Wait for message
│ │
│────── Send Message ──────────────────>│
│ │
│ │ 4. ReceiveMultipartMessage()
│ │ 5. ProcessRequest(message)
│ │
│<────── Response ──────────────────────│
│ │
│ 3. ReceiveFrameString() │ 6. SendFrame(response)
│ 4. Dispose socket │ 7. Loop back to step 3
│ │
Dependencies
SiLA2.IPC.NetMQ has the following NuGet dependencies:
| Package | Version | Purpose |
|---|---|---|
NetMQ |
4.0.2.2 | ZeroMQ C# port - core messaging library |
Microsoft.Extensions.Logging |
10.0.2 | Logging abstractions for diagnostic output |
System.Security.Cryptography.Xml |
10.0.2 | Cryptographic operations (dependency of NetMQ) |
Platform Support
- Target Framework: .NET 10.0
- Operating Systems: Windows, Linux, macOS
- Architecture: Platform-independent (AnyCPU)
- Transport Support:
- TCP: All platforms
- IPC: Linux and macOS only (not available on Windows)
- Inproc: All platforms
Troubleshooting
Issue: Port Already in Use
Symptom: Server fails to bind with "Address already in use" error
Solution:
# Check if port is in use
# Linux/macOS
netstat -tuln | grep 5555
# Windows
netstat -ano | findstr 5555
# Use different port
var config = new ZeroMqServerConfig("tcp://*:5556");
Issue: Connection Refused
Symptom: Client throws exception "Connection refused"
Checks:
- Verify server is running
- Check firewall rules
- Verify correct IP address and port
- Ensure server is bound to correct interface
// Server: Bind to all interfaces (not just localhost)
var config = new ZeroMqServerConfig("tcp://*:5555"); // Good
// Not:
var config = new ZeroMqServerConfig("tcp://127.0.0.1:5555"); // Only localhost
Issue: Timeout Errors
Symptom: Client throws "Failed to receive Server Response"
Solutions:
// Increase timeout
var config = new ZeroMqClientConfig(
"tcp://localhost:5555",
timeout: TimeSpan.FromSeconds(30) // Increased from 5s
);
// Check server is processing messages
// Add logging to server ProcessRequest delegate
Func<NetMQMessage, string> processor = (message) =>
{
Console.WriteLine($"Processing: {message}");
var result = SlowOperation(); // Identify slow operations
Console.WriteLine("Done processing");
return result;
};
Issue: Socket Cleanup Problems
Symptom: Sockets not properly disposed, resource leaks
Solution:
// NetMQ requires proper cleanup
// The implementation handles this automatically via using statements
// If implementing custom logic:
using (var socket = new RequestSocket())
{
socket.Connect(address);
socket.SendFrame(message);
var response = socket.ReceiveFrameString();
} // Socket disposed here
Issue: Firewall Configuration
Windows Firewall:
# PowerShell (as Administrator)
New-NetFirewallRule -DisplayName "NetMQ Port 5555" `
-Direction Inbound `
-Protocol TCP `
-LocalPort 5555 `
-Action Allow
Linux Firewall (ufw):
sudo ufw allow 5555/tcp
Linux Firewall (firewalld):
sudo firewall-cmd --zone=public --add-port=5555/tcp --permanent
sudo firewall-cmd --reload
Performance and Best Practices
Connection Pooling Patterns
Good Practice:
// Reuse client instances
public class WorkerPool
{
private readonly ZeroMqClient _client;
public WorkerPool(string address)
{
var config = new ZeroMqClientConfig(address, TimeSpan.FromSeconds(5));
var logger = LoggerFactory.Create(b => b.AddConsole())
.CreateLogger<ZeroMqClient>();
_client = new ZeroMqClient(config, logger);
}
public string SendWork(string work)
{
return _client.Send(work);
}
}
Message Size Considerations
Small Messages (< 1KB):
- Optimal for NetMQ
- Minimal overhead
- Fast serialization
Medium Messages (1KB - 1MB):
- Use compression if appropriate
- Consider JSON or MessagePack
Large Messages (> 1MB):
- Chunk into smaller messages
- Use multipart messages
- Consider file transfer or shared storage
// Chunking large data
const int CHUNK_SIZE = 1024 * 1024; // 1MB chunks
for (int offset = 0; offset < data.Length; offset += CHUNK_SIZE)
{
var chunk = data.Skip(offset).Take(CHUNK_SIZE).ToArray();
var chunkJson = JsonSerializer.Serialize(new
{
Offset = offset,
Data = Convert.ToBase64String(chunk),
IsLast = offset + CHUNK_SIZE >= data.Length
});
client.Send(chunkJson);
}
When to Use vs Not Use NetMQ
Use NetMQ When:
- ✅ Need lightweight IPC between processes
- ✅ Microservices on same machine/network
- ✅ High-frequency message passing
- ✅ Worker pool patterns
- ✅ Command distribution
- ✅ Integration with ZeroMQ ecosystem
Don't Use NetMQ When:
- ❌ Need SiLA2 protocol compliance (use gRPC)
- ❌ Client-server feature implementation (use gRPC)
- ❌ Need HTTP/REST compatibility
- ❌ Want automatic load balancing (use Kubernetes + gRPC)
- ❌ Need complex message routing (consider message brokers)
Comparison with gRPC
Understanding when to use NetMQ vs gRPC in your SiLA2 system:
| Feature | NetMQ (SiLA2.IPC.NetMQ) | gRPC (SiLA2.Core) |
|---|---|---|
| Primary Use | Inter-process messaging | SiLA2 protocol implementation |
| Message Pattern | Request/Reply (REQ/REP) | Unary, Streaming, Bidirectional |
| Protocol | ZeroMQ wire protocol | HTTP/2 |
| Serialization | Custom (string, JSON, binary) | Protocol Buffers |
| Performance | Very fast (local IPC) | Fast (network optimized) |
| Transport | TCP, IPC, inproc | TCP, Unix domain sockets |
| Service Discovery | Manual | Manual or service mesh |
| Security | Manual (TLS proxy, SSH tunnel) | Built-in TLS/SSL |
| Code Generation | Not required | Required (protoc) |
| Learning Curve | Simple | Moderate |
| Best For | Microservices coordination | SiLA2 client-server communication |
Use Case Recommendations
Use gRPC (SiLA2.Core) for:
- Implementing SiLA2 features (Commands, Properties, Metadata)
- Client applications connecting to SiLA2 servers
- Cross-language compatibility requirements
- Internet-facing services
- Complex streaming scenarios
Use NetMQ (SiLA2.IPC.NetMQ) for:
- Communication between SiLA2 server and worker processes
- Microservices coordination within same machine
- High-frequency local messaging
- Distributing commands to device controllers
- Lightweight message passing without protocol overhead
Example Architecture Using Both:
┌─────────────────────────────────────────────────┐
│ SiLA2 Client (Python, C++, C#) │
│ │
└────────────────┬────────────────────────────────┘
│
│ gRPC (SiLA2 Protocol)
│
▼
┌─────────────────────────────────────────────────┐
│ SiLA2 Temperature Server (C#) │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ TemperatureController Feature (gRPC) │ │
│ └──────────────┬────────────────────────────┘ │
│ │ │
│ │ NetMQ (IPC) │
│ │ │
│ ┌──────────────▼────────────────────────────┐ │
│ │ Worker Process (PID Controller) │ │
│ └───────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
Additional Resources
- SiLA Consortium: https://sila-standard.com
- SiLA2 C# Wiki: https://gitlab.com/SiLA2/sila_csharp/-/wikis/home
- Main Repository: https://gitlab.com/SiLA2/sila_csharp
- Issues & Feature Requests: https://gitlab.com/SiLA2/sila_csharp/-/issues
ZeroMQ and NetMQ Resources
- ZeroMQ Guide: https://zguide.zeromq.org/
- NetMQ Documentation: https://netmq.readthedocs.io/
- ZeroMQ Patterns: https://zeromq.org/socket-api/
- NetMQ GitHub: https://github.com/zeromq/netmq
Related Modules
- SiLA2.Core - Main server implementation and domain models
- SiLA2.Utils - Network utilities, gRPC channels, certificate management
- SiLA2.Database.NoSQL - NoSQL database abstractions (LiteDB)
- SiLA2.Audit - Audit logging for compliance
External Documentation
- ZeroMQ Messaging Patterns: http://zguide.zeromq.org/page:all#toc3
- REQ/REP Pattern: https://zeromq.org/socket-api/#request-reply-pattern
- NetMQ Performance Tips: https://netmq.readthedocs.io/en/latest/performance/
Contributing
This library is part of the SiLA2 C# implementation. Contributions are welcome!
- Fork the repository
- Create a feature branch
- Make your changes
- Add unit tests
- Submit a merge request
For issues and feature requests, visit: https://gitlab.com/SiLA2/sila_csharp/-/issues
License
This project is licensed under the MIT License. See the repository for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Logging (>= 10.0.3)
- NetMQ (>= 4.0.2.2)
- System.Security.Cryptography.Xml (>= 10.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 10.2.2 | 94 | 2/12/2026 |
| 10.2.1 | 96 | 1/25/2026 |
| 10.2.0 | 199 | 12/23/2025 |
| 10.1.0 | 153 | 11/29/2025 |
| 10.0.0 | 330 | 11/11/2025 |
| 9.0.4 | 276 | 6/25/2025 |
| 9.0.3 | 210 | 6/21/2025 |
| 9.0.2 | 226 | 1/6/2025 |
| 9.0.1 | 223 | 11/17/2024 |
| 9.0.0 | 232 | 11/13/2024 |
| 8.1.2 | 228 | 10/20/2024 |
| 8.1.1 | 246 | 8/31/2024 |
| 8.1.0 | 271 | 2/11/2024 |
| 8.0.0 | 277 | 11/15/2023 |
| 7.5.4 | 209 | 10/27/2023 |
| 7.5.3 | 247 | 7/19/2023 |
| 7.5.2 | 261 | 7/3/2023 |
| 7.5.1 | 267 | 6/2/2023 |
| 7.4.6 | 289 | 5/21/2023 |
| 7.4.5 | 278 | 5/7/2023 |