SharpPulsar 2.10.0-rc1132

This is a prerelease version of SharpPulsar.
There is a newer version of this package available.
See the version list below for details.
dotnet add package SharpPulsar --version 2.10.0-rc1132
                    
NuGet\Install-Package SharpPulsar -Version 2.10.0-rc1132
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SharpPulsar" Version="2.10.0-rc1132" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SharpPulsar" Version="2.10.0-rc1132" />
                    
Directory.Packages.props
<PackageReference Include="SharpPulsar" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SharpPulsar --version 2.10.0-rc1132
                    
#r "nuget: SharpPulsar, 2.10.0-rc1132"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SharpPulsar@2.10.0-rc1132
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SharpPulsar&version=2.10.0-rc1132&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=SharpPulsar&version=2.10.0-rc1132&prerelease
                    
Install as a Cake Tool

Build Tests

SharpPulsar

SharpPulsar is an Apache Pulsar Client built on top Akka.net, which can handle millions of Apache Pulsar Producers/Consumers (in theory).

What Is Akka.NET?

Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono that is able to support up to 50 million msg/sec on a single machine, with small memory footprint and ~2.5 million actors(or Apache Pulsar Producers/Consumers) per GB of heap.

What Is Apache Pulsar?

Apache Pulsar is a cloud-native, distributed messaging and streaming platform that is able to support millions of topics while delivering high-throughput and low-latency performance.

Supported features

Client

  • TLS
  • Authentication (token, tls, OAuth2)
  • Multi-Hosts Service URL
  • Proxy
  • SNI Routing
  • Transactions
  • Subscription(Durable, Non-durable)
  • Cluster-level Auto Failover

Producer

  • Exclusive Producer
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • User-defined properties
  • Key-based batcher
  • Delayed/Scheduled messages
  • Interceptors
  • Message Router (RoundRobin, ConsistentHashing, Broadcast, Random)
  • End-to-end Encryption
  • Chunking
  • Transactions

Consumer

  • User-defined properties
  • HasMessageAvailable
  • Subscription Type (Exclusive, Failover, Shared, Key_Shared)
  • Subscription Mode (Durable, Non-durable)
  • Interceptors
  • Ack (Ack Individual, Ack Commulative, Batch-Index Ack)
  • Ack Timeout
  • Negative Ack
  • Dead Letter Policy
  • End-to-end Encryption
  • SubscriptionInitialPosition
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Compacted Topics
  • Multiple Topics
  • Regex Consumer
  • Broker Entry Metadata

Reader

  • User-defined properties
  • HasMessageAvailable
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Seek (MessageID, Timestamp)
  • Multiple Topics
  • End-to-end Encryption
  • Interceptors

TableView

  • Compacted Topics
  • Schema (All supported schema types)
  • Register Listener

Extras

  • Pulsar SQL
  • Pulsar Admin REST API
  • Function REST API
  • EventSource(Reader/SQL)
  • OpenTelemetry (ProducerOTelInterceptor, ConsumerOTelInterceptor)

Getting Started

Install the NuGet package SharpPulsar and follow the Tutorials.

//pulsar client settings builder
            var clientConfig = new PulsarClientConfigBuilder()
                .ServiceUrl("pulsar://localhost:6650");

            //pulsar actor system
            var pulsarSystem = PulsarSystem.GetInstance(clientConfig);

            var pulsarClient = pulsarSystem.NewClient();

            var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder<sbyte[]>()
                .Topic(myTopic)
                .ForceTopicCreation(true)
                .SubscriptionName("myTopic-sub"));

            var producer = pulsarClient.NewProducer(new ProducerConfigBuilder<sbyte[]>()
                .Topic(myTopic));

            for (var i = 0; i < 10; i++)
            {
                var data = Encoding.UTF8.GetBytes($"tuts-{i}").ToSBytes();
                producer.NewMessage().Value(data).Send();
            }
			Thread.Sleep(TimeSpan.FromSeconds(5));
            for (var i = 0; i < 10; i++)
            {
                var message = (Message<sbyte[]>)consumer.Receive();
                consumer.Acknowledge(message);
                var res = Encoding.UTF8.GetString(message.Data.ToBytes());
                Console.WriteLine($"message '{res}' from topic: {message.TopicName}");
            }

Logical Types

Avro Logical Types are supported. Message object MUST implement ISpecificRecord

    AvroSchema<LogicalMessage> avroSchema = AvroSchema<LogicalMessage>.Of(ISchemaDefinition<LogicalMessage>.Builder().WithPojo(typeof(LogicalMessage)).WithJSR310ConversionEnabled(true).Build());

    public class LogicalMessage : ISpecificRecord
    {
        [LogicalType(LogicalTypeKind.Date)]
        public DateTime CreatedTime { get; set; }
		
        [LogicalType(LogicalTypeKind.TimestampMicrosecond)]
        public DateTime StampMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimestampMillisecond)]
        public DateTime StampMillis { get; set; }
		
	[LogicalType(LogicalTypeKind.TimeMicrosecond)]
        public TimeSpan TimeMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimeMillisecond)]
        public TimeSpan TimeMillis { get; set; }
        
        public AvroDecimal Size { get; set; }
		
        public string DayOfWeek { get; set; }

        [Ignore]
        public Avro.Schema Schema { get; set; }

        public object Get(int fieldPos)
        {
            switch (fieldPos)
            {
                case 0: return CreatedTime; 
	        case 1: return StampMicros;
                case 2: return StampMillis;
	        case 3: return TimeMicros;
                case 4: return TimeMillis;
                case 5: return Size;
                case 6: return DayOfWeek;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }

        public void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
                case 0: CreatedTime = (DateTime)fieldValue; break;
		case 1: StampMicros = (DateTime)fieldValue; break;
                case 2: StampMillis = (DateTime)fieldValue; break;
	        case 3: TimeMicros = (TimeSpan)fieldValue; break;
                case 4: TimeMillis = (TimeSpan)fieldValue; break;
                case 5: Size = (AvroDecimal)fieldValue; break;
                case 6: DayOfWeek = (String)fieldValue; break;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }

KeyValue Schema ALERT!!!!

Because I have become lazy and a lover of "peace of mind":

  • For schema type of KEYVALUESCHEMA:
    producer.NewMessage().Value<TK, TV>(data).Send();  
    
    OR
    producer.Send<TK, TV>(data);
    

TK, TV represents the key and value types of the KEYVALUESCHEMA respectively.

TableView

var topic = $"persistent://public/default/tableview-{DateTime.Now.Ticks}";
var count = 20;
var keys = await PublishMessages(topic, count, false);

var tv = await _client.NewTableViewBuilder(ISchema<string>.Bytes)
.Topic(topic)
.AutoUpdatePartitionsInterval(TimeSpan.FromSeconds(60))
.CreateAsync();
 
 Console.WriteLine($"start tv size: {tv.Size()}");
 tv.ForEachAndListen((k, v) => Console.WriteLine($"{k} -> {Encoding.UTF8.GetString(v)}"));
 await Task.Delay(5000);
 Console.WriteLine($"Current tv size: {tv.Size()}");

 tv.ForEachAndListen((k, v) => Console.WriteLine($"checkpoint {k} -> {Encoding.UTF8.GetString(v)}"));

OpenTelemetry

var exportedItems = new List<Activity>();
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("producer", "consumer")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("inmemory-test"))
.AddInMemoryExporter(exportedItems)
.Build();

 var producerBuilder = new ProducerConfigBuilder<byte[]>()
 .Intercept(new ProducerOTelInterceptor<byte[]>("producer", _client.Log))
 .Topic(topic);

 var consumerBuilder = new ConsumerConfigBuilder<byte[]>()
 .Intercept(new ConsumerOTelInterceptor<byte[]>("consumer", _client.Log))
 .Topic(topic);

Cluster-level Auto Failover

var config = new PulsarClientConfigBuilder();
var builder = AutoClusterFailover.Builder().Primary(serviceUrl)
.Secondary(new List<string> { secondary })
.FailoverDelay(TimeSpan.FromSeconds(failoverDelay))
.SwitchBackDelay(TimeSpan.FromSeconds(switchBackDelay))
.CheckInterval(TimeSpan.FromSeconds(checkInterval));
config.ServiceUrlProvider(new AutoClusterFailover((AutoClusterFailoverBuilder)builder));

[Experimental]Running SharpPulsar Tests in docker container (the issue I have faced is how to create container from within a container)

You can run SharpPulsar tests in docker container. A Dockerfile and docker-compose file is provided at the root folder to help you run these tests in a docker container. docker-compose.yml:

version: "2.4"

services:
  akka-test:
    image: sharp-pulsar-test
    build: 
      context: .
    cpu_count: 1
    mem_limit: 1g
    environment:
      run_count: 2
      # to filter tests, uncomment
      # test_filter: "--filter FullyQualifiedName=SharpPulsar.Test.MessageChunkingTest"
      test_file: Tests/SharpPulsar.Test/SharpPulsar.Test.csproj

Dockerfile:

FROM mcr.microsoft.com/dotnet/sdk:6.0 
ENV test_file="Tests/SharpPulsar.Test/SharpPulsar.Test.csproj"
ENV test_filter=""
ENV run_count=2
RUN mkdir sharppulsar
COPY . ./sharppulsar
RUN ls
WORKDIR /sharppulsar
CMD ["/bin/bash", "-c", "x=1; c=0; while [ $x -le 1 ] && [ $c -le ${run_count} ]; do dotnet test ${test_file} ${test_filter} --framework net6.0 --logger trx; c=$(( $c + 1 )); if [ $? -eq 0 ]; then x=1; else x=0; fi;  done"]

How to:

cd into the root directory and execute docker-compose up run-count is the number of times you want the test repeated. test_filter is used when you need to test a specific test instead of running all the tests in the test suite.

License

This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net5.0 is compatible.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.15.1 314 2/15/2024
2.15.0 163 2/3/2024
2.14.1 648 9/22/2023
2.14.0 387 7/12/2023
2.13.0 243 5/20/2023
2.12.1 222 5/10/2023
2.12.0 271 5/1/2023
2.11.2 339 3/19/2023
2.11.1 322 3/18/2023
2.11.0 333 3/5/2023
2.11.0-rc0117 182 3/5/2023
2.11.0-rc0112 218 2/2/2023
2.11.0-rc0107 211 2/1/2023
2.11.0-rc0105 195 1/18/2023
2.11.0-rc0097 217 1/15/2023
2.10.0-rc1193 221 1/15/2023
2.10.0-rc1191 913 10/6/2022
2.10.0-rc1186 213 10/4/2022
2.10.0-rc1162 280 9/14/2022
2.10.0-rc1158 262 8/9/2022
2.10.0-rc1157 232 8/9/2022
2.10.0-rc1150 222 8/8/2022
2.10.0-rc1136 308 7/23/2022
2.10.0-rc1135 249 7/23/2022
2.10.0-rc1134 244 7/23/2022
2.10.0-rc1133 243 7/23/2022
2.10.0-rc1132 210 7/21/2022
2.10.0-rc1130 265 7/21/2022
2.10.0-rc1129 245 7/17/2022
2.10.0-rc1125 279 7/10/2022
2.10.0-rc1123 245 7/9/2022
2.10.0-rc.1117 209 7/3/2022
2.10.0-rc.1108 246 6/25/2022
2.10.0-rc.1090 238 6/9/2022
2.10.0-rc.1088 197 6/9/2022
2.10.0-rc.1083 209 6/8/2022
2.10.0-rc.1070 208 6/3/2022
2.10.0-rc.1051 210 5/26/2022
2.10.0-rc.1037 210 5/16/2022
2.10.0-rc.1035 226 5/13/2022
2.10.0-rc.1034 219 5/13/2022
2.10.0-rc.1033 219 5/13/2022
2.10.0-rc.1032 219 5/13/2022
2.10.0-rc.1031 225 5/13/2022
2.10.0-rc.1022 324 4/2/2022
2.10.0-rc.1013 246 3/26/2022
2.9.0 1,566 2/21/2022
2.9.0-rc.975 257 2/13/2022
2.9.0-beta.971 231 2/13/2022
2.9.0-beta.47 245 1/9/2022
2.9.0-beta.45 236 12/29/2021
2.9.0-beta.44 237 12/21/2021
2.9.0-beta.43 247 12/19/2021
2.9.0-beta.1 228 2/13/2022
2.2.4 582 11/22/2021
2.2.4-beta.42 271 11/10/2021
2.2.4-beta.41 278 11/3/2021
2.2.4-beta.40 263 9/29/2021
2.2.4-beta 391 9/28/2021
2.2.3 556 9/22/2021
2.2.3-beta 404 9/21/2021
2.2.2 577 9/13/2021
2.2.2-beta 375 9/13/2021
2.2.1 630 9/11/2021
2.2.1-beta 424 9/11/2021
2.2.0 604 9/10/2021
2.2.0-beta 418 9/10/2021
2.1.0 592 9/5/2021
2.1.0-beta.33 287 9/5/2021
2.0.18 587 8/14/2021
2.0.0-beta.31 280 8/14/2021
2.0.0-beta.30 261 8/13/2021
2.0.0-beta.29 269 8/12/2021
2.0.0-beta.28 262 8/11/2021
2.0.0-beta.27 274 8/10/2021
2.0.0-beta.26 268 8/9/2021
2.0.0-beta.25 281 8/6/2021
2.0.0-beta.24 271 8/5/2021
2.0.0-beta.23 281 8/4/2021
2.0.0-beta.22 267 8/4/2021
2.0.0-beta.20 261 7/31/2021
2.0.0-beta.19 321 7/30/2021
2.0.0-beta.15 283 5/13/2021
2.0.0-beta.14 278 5/12/2021
2.0.0-beta.13 275 5/11/2021
2.0.0-beta.12 297 5/10/2021
2.0.0-beta.11 286 5/9/2021
2.0.0-beta.10 316 5/7/2021
2.0.0-beta.9 289 4/23/2021
2.0.0-beta.8 287 4/22/2021
2.0.0-beta.7 288 4/22/2021
2.0.0-beta.6 277 4/22/2021
2.0.0-beta.5 279 4/15/2021
2.0.0-beta.4 297 4/14/2021
2.0.0-beta 401 4/10/2021
1.4.2.1 774 9/3/2020
1.4.2 686 9/2/2020
1.4.1 702 8/29/2020
1.4.0 705 8/29/2020
1.4.0-release.1 225 2/13/2022
1.3.5 687 6/9/2020
1.3.4 678 6/9/2020
1.3.3 719 6/8/2020
1.3.2 678 6/8/2020
1.3.1 838 6/5/2020
1.3.0 739 6/3/2020
1.2.0 874 5/26/2020
1.1.0 773 5/26/2020
1.0.0 774 5/23/2020
0.9.0 734 5/21/2020
0.8.5 690 5/20/2020
0.8.4 761 5/9/2020
0.8.3 697 5/8/2020
0.8.2 735 5/2/2020
0.8.1 728 4/30/2020
0.8.0 775 4/28/2020
0.7.0 745 4/20/2020
0.6.5 685 4/16/2020
0.6.4 758 4/15/2020
0.6.3 733 4/14/2020
0.6.2 705 4/14/2020
0.6.1 721 4/13/2020
0.6.0 756 4/12/2020
0.5.3 906 4/5/2020
0.5.2 730 3/30/2020
0.5.1 844 3/28/2020
0.5.0 719 3/27/2020
0.4.0 759 3/17/2020
0.3.0 718 3/13/2020
0.2.0 698 3/11/2020
0.0.1.1 746 3/7/2020
0.0.1 744 3/7/2020
0.0.1-alpha 534 3/8/2020

Added OpenTelemetry