μλ νμΈμ.
νμ¬ .net Framework νλ‘μ νΈμ kafkaλ₯Ό λ°λ‘ μ°λμ΄ λΆκ° νμ¬ νλ‘μ νΈλ₯Ό νλ λ μμ±νμ¬ .net Core νλ‘μ νΈλ₯Ό μ°Έμ‘°νλ ννλ‘ μ°λ ν μ€νΈλ₯Ό μ§ννκ³ μμ΅λλ€.
Producer, Consumer ν μ€νΈ μ§νμλ μ΄μμ΄ μλλ° Consumer μμ κ³μ λ°μ΄ν°λ₯Ό μμ§νλ νμμ΄ μλλΌ μμ§ μ μΆκ° μμ§ λ°μ΄ν°κ° μμΌλ©΄ κ·Έλ§ λ©μΆκ³ λ€μ μ κ° μν λ νΈμΆνμ¬ μμ§μ νλ νμμΌλ‘ νκ³ μΆμλ° λ°©λ²μ΄ μμκΉμ?
try
{
Console.WriteLine("--------------- Start Consumer ---------------");
var config = new ConsumerConfig
{
BootstrapServers = "broker01,broker02,broker03",
//ClientId = Dns.GetHostName(),
GroupId = "GroupId",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
using(var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("topic");
CancellationTokenSource cts = new CancellationTokenSource();
try
{
while(!cts.IsCancellationRequested)
//while (true)
{
try
{
var consumerResult = consumer.Consume(cts.Token); // get consume result
if(consumerResult.Message == null)
//if (consumerResult.IsPartitionEOF)
{
Console.WriteLine("--------------- IsPartitionEOF ---------------");
consumer.Close();
break;
}
//consumer.Commit(consumerResult);
}
catch (ConsumeException cex)
{
Console.WriteLine("consumerException : " + cex.Message);
}
catch(Exception ex)
{
Console.WriteLine("consumer exception : " + ex.Message);
}
}
}
catch (OperationCanceledException ocex)
{
Console.WriteLine($"OperationCanceledException : {ocex.Message}");
consumer.Close();
}
catch(Exception ex)
{
Console.WriteLine($"exception : {ex.Message}");
consumer.Close();
}
finally
{
consumer.Close();
}
Console.WriteLine("--------------- End Consumer -------------- - ");
}
}
catch (Exception ex)
{
Console.WriteLine($"ex : {ex.Message}");
}