Zonit.Messaging.Schedules.Abstractions 1.1.4

dotnet add package Zonit.Messaging.Schedules.Abstractions --version 1.1.4
                    
NuGet\Install-Package Zonit.Messaging.Schedules.Abstractions -Version 1.1.4
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Zonit.Messaging.Schedules.Abstractions" Version="1.1.4" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Zonit.Messaging.Schedules.Abstractions" Version="1.1.4" />
                    
Directory.Packages.props
<PackageReference Include="Zonit.Messaging.Schedules.Abstractions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Zonit.Messaging.Schedules.Abstractions --version 1.1.4
                    
#r "nuget: Zonit.Messaging.Schedules.Abstractions, 1.1.4"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Zonit.Messaging.Schedules.Abstractions@1.1.4
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Zonit.Messaging.Schedules.Abstractions&version=1.1.4
                    
Install as a Cake Addin
#tool nuget:?package=Zonit.Messaging.Schedules.Abstractions&version=1.1.4
                    
Install as a Cake Tool

Zonit.Messaging

A lightweight, high-performance .NET library for building event-driven and CQRS architectures with full AOT/Trimming support.


📦 NuGet Packages

Current Packages (Zonit.Messaging)

Package Version Downloads Description
Zonit.Messaging.Commands NuGet NuGet CQRS Commands & Queries
Zonit.Messaging.Commands.Abstractions NuGet NuGet Command interfaces
Zonit.Messaging.Events NuGet NuGet Pub/Sub Events
Zonit.Messaging.Events.Abstractions NuGet NuGet Event interfaces
Zonit.Messaging.Tasks NuGet NuGet Background Jobs
Zonit.Messaging.Tasks.Abstractions NuGet NuGet Task interfaces
Zonit.Messaging.Schedules NuGet NuGet Recurring Jobs
Zonit.Messaging.Schedules.Abstractions NuGet NuGet Schedule interfaces

Legacy Packages (deprecated)

Package Version Downloads Status
Zonit.Services.EventMessage NuGet NuGet ⚠️ Deprecated
Zonit.Services.EventMessage.Abstractions NuGet NuGet ⚠️ Deprecated
# Install current packages
dotnet add package Zonit.Messaging.Commands
dotnet add package Zonit.Messaging.Events
dotnet add package Zonit.Messaging.Tasks
dotnet add package Zonit.Messaging.Schedules

# Or via NuGet Package Manager
Install-Package Zonit.Messaging.Commands
Install-Package Zonit.Messaging.Events
Install-Package Zonit.Messaging.Tasks
Install-Package Zonit.Messaging.Schedules

Features

  • Commands (CQRS) - Request/Response pattern with strongly-typed handlers
  • Events (Pub/Sub) - Publish events to multiple subscribers (fan-out)
  • Tasks (Background Jobs) - Queue long-running operations with retry support
  • Schedules (Recurring Jobs) - Execute jobs on schedule (cron-like, strongly-typed)
  • Transaction Support - Group events into transactions to be processed sequentially
  • AOT-Safe - Full Native AOT and trimming support via Source Generators
  • Concurrent Processing - Control the number of concurrently executed handlers
  • Timeout Handling - Configure timeouts for processing

Requirements

  • .NET 8, .NET 9 or .NET 10

Quick Start

Just call the registration methods - they work with or without handlers:

using Zonit.Messaging.Commands;
using Zonit.Messaging.Events;
using Zonit.Messaging.Tasks;
using Zonit.Messaging.Schedules;

// In your DI configuration (Program.cs or plugin registration)
services.AddCommandHandlers();  // Registers Commands (CQRS)
services.AddEventHandlers();    // Registers Events (Pub/Sub)  
services.AddTaskHandlers();     // Registers Tasks (Background Jobs)
services.AddScheduleServices(); // Registers Schedules (Recurring Jobs)

That's it! These methods:

  • ? Always work - even without any handlers
  • ? Auto-discover handlers - Source Generator automatically registers handlers at compile-time
  • ? Safe to call multiple times - uses TryAdd to prevent duplicates
  • ? AOT/Trimming safe - no runtime reflection

Plugin Architecture

For modular applications, call the methods in each plugin:

// In Kemavo.Plugins.Catalogs.Application
namespace Kemavo.Plugins;

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddCatalogsPlugin(this IServiceCollection services)
    {
        // These methods exist in Zonit.Messaging.* libraries
        // Source Generator automatically adds handlers from this assembly
        services.AddCommandHandlers();
        services.AddEventHandlers();
        services.AddTaskHandlers();
        services.AddScheduleServices();
        
        return services;
    }
}

// In Program.cs
services.AddCatalogsPlugin();
services.AddWalletsPlugin();
services.AddAIPlugin();

Manual Handler Registration

For fine-grained control, you can register handlers explicitly (100% AOT-safe):

// Commands - specify handler, request, and response types
services.AddCommand<CreateUserHandler, CreateUserCommand, Guid>();

// Events - specify handler and event types
services.AddEvent<UserCreatedHandler, UserCreatedEvent>();

// Tasks - specify handler and task types
services.AddTask<SendEmailHandler, SendEmailTask>();

// Schedules - specify handler and data types
services.AddScheduleHandler<CleanupHandler, CleanupJobData>();

Commands (CQRS)

Request/Response pattern - send a request, get a typed response.

1. Define a Command

public record CreateUserCommand(string Name, string Email) : IRequest<Guid>;

2. Implement Handler

public class CreateUserHandler : IRequestHandler<CreateUserCommand, Guid>
{
    public async Task<Guid> HandleAsync(CreateUserCommand request, CancellationToken ct = default)
    {
        var userId = Guid.NewGuid();
        // Save to database...
        return userId;
    }
}

3. Send Command

var commandProvider = serviceProvider.GetRequiredService<ICommandProvider>();
var userId = await commandProvider.SendAsync(new CreateUserCommand("John", "john@example.com"));

Events (Pub/Sub)

Publish events to multiple subscribers asynchronously.

Create a class implementing IEventHandler<T> for automatic registration:

public record UserCreatedEvent(string Name, string Email);

public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
    private readonly ILogger<UserCreatedEventHandler> _logger;
    
    public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
    {
        _logger = logger;
    }
    
    public async Task HandleAsync(UserCreatedEvent data, CancellationToken cancellationToken)
    {
        _logger.LogInformation("User created: {Name}", data.Name);
        await SendWelcomeEmailAsync(data.Email, cancellationToken);
    }
}

2. Subscribe Manually (Alternative)

var eventManager = serviceProvider.GetRequiredService<IEventManager>();

eventManager.Subscribe<UserCreatedEvent>(async (data, cancellationToken) =>
{
    Console.WriteLine($"User created: {data.Name}");
    await Task.CompletedTask;
});

3. Publish Events

var eventProvider = serviceProvider.GetRequiredService<IEventProvider>();
eventProvider.Publish(new UserCreatedEvent("John", "john@example.com"));

3. Using Transactions

Group events to be processed sequentially:

using (var transaction = eventProvider.CreateTransaction())
{
    eventProvider.Publish(new Event1());
    eventProvider.Publish(new Event2());
    // Events are queued until the transaction is completed
}
// Events are processed after the transaction is disposed

4. Awaiting Transaction Completion

Wait for all events in a transaction to be processed:

using (var transaction = eventProvider.CreateTransaction())
{
    eventProvider.Publish(new OrderCreatedEvent());
    eventProvider.Publish(new InventoryUpdatedEvent());
    
    // Wait for all events to be processed before continuing
    await transaction.WaitForCompletionAsync();
}

Tasks (Background Jobs)

Long-running operations with retry support and real-time progress tracking.

1. Simple Task Handler

var taskManager = serviceProvider.GetRequiredService<ITaskManager>();

taskManager.Subscribe<SendEmailTask>(async payload =>
{
    await SendEmailAsync(payload.Data.To, payload.Data.Subject);
}, new TaskSubscriptionOptions
{
    WorkerCount = 5,
    MaxRetries = 3,
    RetryDelay = TimeSpan.FromSeconds(10)
});

2. Task Handler with Progress Tracking

Create handlers with automatic progress reporting based on estimated step durations:

public class ImportDataHandler : TaskHandler<ImportDataTask>
{
    public override int WorkerCount => 2;
    public override TimeSpan Timeout => TimeSpan.FromMinutes(10);
    
    // Optional: Display title and description in UI
    public override string? Title => "Import Data";
    public override string? Description => "Importing data from external source";

    // Define steps with estimated durations for smooth progress calculation
    public override TaskProgressStep[]? ProgressSteps =>
    [
        new(TimeSpan.FromSeconds(5), "Connecting to source..."),
        new(TimeSpan.FromSeconds(10), "Downloading data..."),
        new(TimeSpan.FromSeconds(15), "Processing records..."),
        new(TimeSpan.FromSeconds(5), "Saving to database...")
    ];

    protected override async Task HandleAsync(
        ImportDataTask data,
        ITaskProgressContext progress,
        CancellationToken cancellationToken)
    {
        // Step 1: Connect (0% -> 14%)
        await progress.NextAsync();
        await ConnectAsync(cancellationToken);

        // Step 2: Download (14% -> 43%)
        await progress.NextAsync();
        await DownloadAsync(data.Url, cancellationToken);

        // Step 3: Process (43% -> 86%)
        await progress.NextAsync();
        for (int i = 0; i < data.RecordCount; i++)
        {
            await ProcessRecordAsync(i, cancellationToken);
            // Update message without changing step
            await progress.SetMessageAsync($"Processing {i + 1}/{data.RecordCount}...");
        }

        // Step 4: Save (86% -> 100%)
        await progress.NextAsync();
        await SaveAsync(cancellationToken);
    }
}

With Source Generators, registration is automatic:

// In Program.cs - handlers are auto-discovered and registered
services.AddTaskHandlers();

3. Publish Tasks

var taskProvider = serviceProvider.GetRequiredService<ITaskProvider>();

// Simple publish
taskProvider.Publish(new SendEmailTask { To = "user@example.com", Subject = "Welcome!" });

// Publish with ExtensionId for filtering
var organizationId = Guid.NewGuid();
taskProvider.Publish(new ImportDataTask("data.csv", 1000), organizationId);

4. Monitoring Task Progress

Subscribe to real-time progress updates:

var taskManager = serviceProvider.GetRequiredService<ITaskManager>();

// Monitor all tasks
taskManager.OnChange(state =>
{
    Console.WriteLine($"Task {state.TaskType}: {state.Progress}% - {state.Message}");
    Console.WriteLine($"Duration: {state.Duration}");
});

// Monitor specific task type with typed data access
taskManager.OnChange<ImportDataTask>(state =>
{
    Console.WriteLine($"Import from {state.Data.Source}: {state.Progress}%");
    Console.WriteLine($"Step {state.CurrentStep}/{state.TotalSteps}: {state.Message}");
    Console.WriteLine($"Running for: {state.Duration?.TotalSeconds:F1}s");
});

// Monitor multiple task types at once (efficient filtering)
taskManager.OnChange<ImportDataTask, ExportDataTask>(state =>
{
    // Receives updates only for ImportDataTask or ExportDataTask
    UpdateProgress(state.TaskType, state.Progress);
});

// Monitor up to 4 types simultaneously
taskManager.OnChange<Task1, Task2, Task3, Task4>(state =>
{
    // Efficient server-side filtering
    NotifyUI(state);
});

// Monitor tasks for specific ExtensionId (e.g., organization)
taskManager.OnChange(organizationId, state =>
{
    // Only tasks published with this ExtensionId
    UpdateProgressBar(state.Progress ?? 0);
});

// Monitor specific type for specific ExtensionId
taskManager.OnChange<ImportDataTask>(organizationId, state =>
{
    // Typed access + filtered by ExtensionId
    UpdateUI(state.Data, state.Progress);
});

5. TaskState Properties

Property Type Description
TaskId Guid Unique task identifier
ExtensionId Guid? Optional identifier for filtering (e.g., user/organization ID)
TaskType string Full type name of the task
Title string? Optional display title for the task (null = uses TaskType)
Description string? Optional description of what the task does
Status TaskStatus Current status (Pending, Processing, Completed, Failed, Cancelled)
Progress int? Progress 0-100 (null if not tracked)
CurrentStep int? Current step number (1-based)
TotalSteps int? Total number of steps
Message string? Current status message
CreatedAt DateTimeOffset When task was created
StartedAt DateTimeOffset? When processing started
CompletedAt DateTimeOffset? When processing finished
Duration TimeSpan? Time elapsed since start

6. Viewing Active Tasks

// Get all active tasks
var activeTasks = taskManager.GetActiveTasks();

// Get active tasks for specific ExtensionId
var orgTasks = taskManager.GetActiveTasks(organizationId);

// Get active tasks of specific type with typed data access
var importTasks = taskManager.GetActiveTasks<ImportDataTask>();
foreach (var task in importTasks)
{
    Console.WriteLine($"Import from {task.Data.Source}: {task.Progress}% - {task.Duration}");
}

// Get active tasks of multiple types
var dataTasks = taskManager.GetActiveTasks<ImportDataTask, ExportDataTask>();
var allProcessingTasks = taskManager.GetActiveTasks<Task1, Task2, Task3>();

// Get tasks filtered by type AND ExtensionId
var orgImports = taskManager.GetActiveTasks<ImportDataTask>(organizationId);

foreach (var task in activeTasks)
{
    Console.WriteLine($"{task.TaskType}: {task.Status} ({task.Progress}%) - {task.Duration}");
}

// Get specific task state
var state = taskManager.GetTaskState(taskId);

7. Task Lifecycle

Tasks go through various states during their lifecycle:

  • Pending - Task is queued but not yet started
  • Processing - Task is currently being processed
  • Completed - Task finished successfully
  • Failed - Task failed during processing
  • Cancelled - Task was cancelled before completion

8. Progress Tracking Features

  • Time-based smooth progress: Progress is automatically calculated based on estimated step durations
  • Automatic updates: Progress updates are sent automatically (max 100 times, when % changes)
  • Step tracking: Know which step is currently executing and how many remain
  • Duration tracking: Real-time duration available via Duration property
  • Typed access: Use OnChange<T> to get typed access to task data
  • Efficient filtering: Filter by ExtensionId at the system level for better performance
  • Multi-type monitoring: Subscribe to multiple task types simultaneously (2-4 types) with single handler

9. ITaskManager API Reference

GetActiveTasks Methods
Method Returns Description
GetActiveTasks() IReadOnlyCollection<TaskState> All active tasks
GetActiveTasks(extensionId) IReadOnlyCollection<TaskState> Active tasks for specific ExtensionId
GetActiveTasks<TTask>() IReadOnlyCollection<TaskState<TTask>> Active tasks of specific type with typed data
GetActiveTasks<TTask>(extensionId) IReadOnlyCollection<TaskState<TTask>> Active tasks of type for ExtensionId
GetActiveTasks<T1, T2>() IReadOnlyCollection<TaskState> Active tasks of 2 types
GetActiveTasks<T1, T2, T3>() IReadOnlyCollection<TaskState> Active tasks of 3 types
GetActiveTasks<T1, T2, T3, T4>() IReadOnlyCollection<TaskState> Active tasks of 4 types
GetTaskState(taskId) TaskState? Specific task by ID
OnChange Methods
Method Parameters Description
OnChange(handler) Action<TaskState> Monitor all task changes
OnChange(extensionId, handler) Guid, Action<TaskState> Monitor tasks for ExtensionId
OnChange<TTask>(handler) Action<TaskState<TTask>> Monitor specific task type with typed data
OnChange<TTask>(extensionId, handler) Guid, Action<TaskState<TTask>> Monitor type for ExtensionId
OnChange<T1, T2>(handler) Action<TaskState> Monitor 2 task types
OnChange<T1, T2, T3>(handler) Action<TaskState> Monitor 3 task types
OnChange<T1, T2, T3, T4>(handler) Action<TaskState> Monitor 4 task types

All OnChange methods return IDisposable for unsubscribing.

Example: Monitoring Multiple Types

// Efficient server-side filtering - only 2 types are monitored
var subscription = taskManager.OnChange<ImportTask, ExportTask>(state =>
{
    // This handler only receives ImportTask and ExportTask updates
    // Much more efficient than filtering in the handler
    Console.WriteLine($"{state.TaskType}: {state.Progress}%");
});

// Unsubscribe when done
subscription.Dispose();

Schedules (Recurring Jobs)

Execute jobs on a recurring schedule using strongly-typed Schedule ValueObject (cron-like but with compile-time safety).

1. Define Handler

public record CleanupJobData(string Directory, int RetentionDays);

public class CleanupHandler : IScheduleHandler<CleanupJobData>
{
    private readonly ILogger<CleanupHandler> _logger;

    public CleanupHandler(ILogger<CleanupHandler> logger)
    {
        _logger = logger;
    }

    public async Task HandleAsync(CleanupJobData data, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Cleanup: {Dir}, retention: {Days} days", 
            data.Directory, data.RetentionDays);
        
        // Perform cleanup...
        await Task.CompletedTask;
    }
}

2. Register Handlers

using Zonit.Messaging.Schedules;

// Option A: Automatic registration via Source Generator (Recommended)
services.AddScheduleServices();  // Source Generator auto-discovers IScheduleHandler<T> implementations

// Option B: Manual registration (100% AOT-safe)
services.AddScheduleHandler<CleanupHandler, CleanupJobData>();

3. Start Schedules

public class MyService
{
    private readonly IScheduleProvider _scheduleProvider;

    public MyService(IScheduleProvider scheduleProvider)
    {
        _scheduleProvider = scheduleProvider;
    }

    public void StartJobs()
    {
        // Every 5 minutes
        var id1 = _scheduleProvider.Start(
            new CleanupJobData("/tmp", 7),
            Schedule.EveryMinutes(5)
        );

        // Daily at 3:00 AM
        var id2 = _scheduleProvider.Start(
            new CleanupJobData("/logs", 30),
            Schedule.EveryDay(3, 0)
        );

        // Multiple schedules (8:00 AM and 6:00 PM)
        var id3 = _scheduleProvider.Start(
            new CleanupJobData("/cache", 1),
            Schedule.EveryDay(8, 0),
            Schedule.EveryDay(18, 0)
        );

        // With options
        var id4 = _scheduleProvider.Start(
            new CleanupJobData("/data", 14),
            options =>
            {
                options.Name = "DataCleanup";
                options.Schedules = [Schedule.EveryHour(atMinute: 30)];
                options.ExecuteOnStartup = true;
            }
        );
    }
}

4. Managing Schedules

// Pause/Resume
_scheduleProvider.Pause(id);
_scheduleProvider.Resume(id);

// Stop permanently
_scheduleProvider.Stop(id);

// Trigger immediate execution
_scheduleProvider.TriggerNow(id);

// Get state
var state = _scheduleProvider.GetState(id);
Console.WriteLine($"Status: {state?.Status}");
Console.WriteLine($"Last run: {state?.LastExecutionAt}");
Console.WriteLine($"Next run: {state?.NextExecutionAt}");
Console.WriteLine($"Executions: {state?.ExecutionCount}");

// Get all schedules
var all = _scheduleProvider.GetAllSchedules();
var active = _scheduleProvider.GetActiveSchedules();

5. Monitor State Changes

// Subscribe to all schedule changes
_scheduleProvider.OnChange(state =>
{
    Console.WriteLine($"{state.Name}: {state.Status} ({state.ExecutionCount} runs)");
});

// Subscribe to specific schedule
_scheduleProvider.OnChange(scheduleId, state =>
{
    if (state.Status == ScheduleStatus.Failed)
        AlertAdmin(state.LastError);
});

6. Schedule Factory Methods

Method Description Example
EverySeconds(n) Every N seconds Schedule.EverySeconds(30)
EveryMinutes(n) Every N minutes Schedule.EveryMinutes(5)
EveryHours(n) Every N hours Schedule.EveryHours(2)
EveryDays(n) Every N days Schedule.EveryDays(1)
EveryMinute() Every minute at :00 Schedule.EveryMinute()
EveryHour(min) Every hour at minute Schedule.EveryHour(30)
EveryDay(h, m) Daily at time Schedule.EveryDay(15, 0)
EveryWeek(day, h, m) Weekly on day Schedule.EveryWeek(DayOfWeek.Monday, 9, 0)
EveryMonth(day, h, m) Monthly on day Schedule.EveryMonth(1, 0, 0)
EveryYear(mo, d, h, m) Yearly on date Schedule.EveryYear(1, 1, 0, 0)

7. ScheduleState Properties

Property Type Description
Id ScheduleId Unique schedule identifier
Name string Schedule name
Status ScheduleStatus Pending, Running, Paused, Stopped, Completed, Failed
Schedules Schedule[] Schedule rules
CreatedAt DateTimeOffset When schedule was created
LastExecutionAt DateTimeOffset? When last execution occurred
NextExecutionAt DateTimeOffset? When next execution is scheduled
ExecutionCount int Total number of executions
ConsecutiveFailures int Number of consecutive failures
LastError string? Last error message
LastExecutionDuration TimeSpan? Duration of last execution

Migration from Legacy API

If upgrading from Zonit.Services.EventMessage:

Legacy (deprecated) New
using Zonit.Services.EventMessage; using Zonit.Messaging.Events;
services.AddEventMessageService() services.AddEventHandlers()
EventBase<T> IEventHandler<T>
TaskBase<T> TaskHandler<T>
PayloadModel<T> Direct TEvent data parameter
payload.Data / payload.CancellationToken (data, cancellationToken) parameters

Event Handler Migration Example

Before (Legacy):

public class UserHandler : EventBase<UserCreatedEvent>
{
    protected override async Task HandleAsync(UserCreatedEvent data, CancellationToken ct)
    {
        // Handle event
    }
}

After (New API):

public class UserHandler : IEventHandler<UserCreatedEvent>
{
    public async Task HandleAsync(UserCreatedEvent data, CancellationToken cancellationToken)
    {
        // Handle event - same signature, cleaner interface!
    }
}

Legacy code continues to work but shows deprecation warnings.


Example Use Cases

  • Decoupling logic between independent application components
  • Implementing event-driven workflows
  • Handling system notifications and real-time updates
  • Creating robust background job queues
  • CQRS architecture implementation

Contributing & Support

Found a bug or have a feature request? Open an issue on GitHub!


License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Zonit.Messaging.Schedules.Abstractions:

Package Downloads
Zonit.Messaging.Schedules

Scheduled job processing with cron-like and interval-based scheduling, retry support, and real-time monitoring. Part of Zonit.Messaging - a lightweight library for event-driven architectures with full AOT/Trimming support.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.1.4 86 1/22/2026
1.1.3 89 1/22/2026