kafka 연동

μ•ˆλ…•ν•˜μ„Έμš”.

ν˜„μž¬ .net Framework ν”„λ‘œμ νŠΈμ— kafkaλ₯Ό λ°”λ‘œ 연동이 λΆˆκ°€ ν•˜μ—¬ ν”„λ‘œμ νŠΈλ₯Ό ν•˜λ‚˜ 더 μƒμ„±ν•˜μ—¬ .net Core ν”„λ‘œμ νŠΈλ₯Ό μ°Έμ‘°ν•˜λŠ” ν˜•νƒœλ‘œ 연동 ν…ŒμŠ€νŠΈλ₯Ό μ§„ν–‰ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€.

Producer, Consumer ν…ŒμŠ€νŠΈ μ§„ν–‰μ—λŠ” 이상이 μ—†λŠ”λ° Consumer μ—μ„œ 계속 데이터λ₯Ό μˆ˜μ§‘ν•˜λŠ” ν˜•μ‹μ΄ μ•„λ‹ˆλΌ μˆ˜μ§‘ μ‹œ μΆ”κ°€ μˆ˜μ§‘ 데이터가 μ—†μœΌλ©΄ 그만 λ©ˆμΆ”κ³  λ‹€μ‹œ μ œκ°€ 원할 λ•Œ ν˜ΈμΆœν•˜μ—¬ μˆ˜μ§‘μ„ ν•˜λŠ” ν˜•μ‹μœΌλ‘œ ν•˜κ³  싢은데 방법이 μ—†μ„κΉŒμš”?

try
        {
            Console.WriteLine("--------------- Start Consumer ---------------");

            var config = new ConsumerConfig
            {
                BootstrapServers = "broker01,broker02,broker03",
                //ClientId = Dns.GetHostName(),
                GroupId = "GroupId",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = true
            };

            using(var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe("topic");

                CancellationTokenSource cts = new CancellationTokenSource();

                try
                {
                    while(!cts.IsCancellationRequested)
                    //while (true)
                    {
                        try
                        {
                            var consumerResult = consumer.Consume(cts.Token);   //  get consume result
                           
                            if(consumerResult.Message == null)
                            //if (consumerResult.IsPartitionEOF)
                            {
                                Console.WriteLine("--------------- IsPartitionEOF ---------------");
                                consumer.Close();
                                break;
                            }
                            //consumer.Commit(consumerResult);
                        }
                        catch (ConsumeException cex)
                        {
                            Console.WriteLine("consumerException : " + cex.Message);
                        }
                        catch(Exception ex)
                        {
                            Console.WriteLine("consumer exception : " + ex.Message);
                        }
                    }
                }
                catch (OperationCanceledException ocex)
                {
                    Console.WriteLine($"OperationCanceledException : {ocex.Message}");
                    consumer.Close();
                }
                catch(Exception ex)
                {
                    Console.WriteLine($"exception : {ex.Message}");
                    consumer.Close();
                }
                finally
                {
                    consumer.Close();
                }

                Console.WriteLine("--------------- End Consumer -------------- - ");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"ex : {ex.Message}");
        }
μ’‹μ•„μš” 2

μΉ΄ν”„μΉ΄μΉ΄ λ©”μ‹œμ§• 기반 κ²Œμ‹œ/ꡬ독 λ©”μ‹œμ§€ λΈŒλ‘œμ»€κ΅°μš”. μ €λŠ” μ‚¬μš© κ²½ν—˜μ΄ μ—†μŠ΅λ‹ˆλ‹€.

κ·Έλ‚˜μ €λ‚˜ λ©”μ‹œμ§€λ₯Ό λ°›μœΌλ©΄ κ΅¬λ…ν•œ 것을 μ·¨μ†Œν•˜κ³  ν•„μš”ν•  λ•Œ λ‹€μ‹œ ꡬ독 ν•˜λ©΄ 될 것 같은데, κ·Έλ ‡κ²Œ μ²˜λ¦¬κ°€ μ•ˆλ˜λ‚˜μš”?

μ’‹μ•„μš” 2

메세지가 없을 λ•Œ ꡬ독을 λŠλŠ” μ΄μœ λŠ” λ¬΄μ—‡μΈκ°€μš”?

μ’‹μ•„μš” 1

ν•œκ±΄μ”© 계속 λ©”μ†Œλ“œλ₯Ό ν˜ΈμΆœν•˜λŠ”κ²ƒλ³΄λ‹€λŠ” ν•΄λ‹Ή λ©”μ†Œλ“œ μ•ˆμ—μ„œ λ‹€ κ°€μ Έμ˜€κ³  μ‹Άμ–΄μ„œ μž…λ‹ˆλ‹€.

μ’‹μ•„μš” 1

νŠΉμ • μ›ν•˜λŠ” κ²½μš°μ—λ§Œ ν˜ΈμΆœν•˜μ—¬ μ²˜λ¦¬ν•˜κ³  μ‹Άμ–΄μ„œμš”

μ’‹μ•„μš” 1

일반적인 상황이라면 @dimohy λ‹˜κ»˜μ„œ μ œμ•ˆν•˜μ‹  λ°©λ²•μœΌλ‘œ μ²˜λ¦¬ν•˜λ©΄ 해결될 것 κ°™μ•„μš”.
그런데 λ©”μ„œλ“œ ν•˜λ‚˜ μž‘μ„±ν•˜κ³  이둜써 ν†΅μ œν•˜κΈΈ μ›ν•˜μ‹ λ‹€λ©΄ λ©”μ‹œμ§€ 큐 라이브러리λ₯Ό μ‚¬μš©ν•˜λŠ” 것은 λΆ€μ μ ˆν•΄ λ³΄μž…λ‹ˆλ‹€.
μ‚¬μš©λͺ©μ μ΄ 많이 λ‹€λ₯Έ 것 같이 λŠκ»΄μ§€κΈ° λ•Œλ¬Έμž…λ‹ˆλ‹€.

μ§ˆλ¬Έμžλ‹˜κ»˜μ„œ μ›ν•˜μ‹ λŒ€λ‘œ λ™μž‘ν•˜λ €λ©΄ 직접 κ΅¬ν˜„ν•˜λŠ” 것이 제일 λΉ λ₯Όκ±°μ—μš”.
ConcurrentQueueλ₯Ό ν•„λ“œλ‘œ κ°–λŠ” Service 클래슀λ₯Ό ν•˜λ‚˜ λ§Œλ“€μ–΄λ†“κ³  이λ₯Ό μ‹±κΈ€ν†€μœΌλ‘œ μš΄μ˜ν•˜λ©΄μ„œ Build() λ˜λŠ” Run() λ©”μ„œλ“œμ— μ›ν•˜λŠ” κΈ°λŠ₯이 λ“€μ–΄κ°€λ©΄ 될 것 κ°™μ•„μš”.

μ’‹μ•„μš” 2

μΉ΄ν”„μΉ΄μ˜ μ˜€ν”„μ…‹κ³Ό ν† ν”½ λ‘κ°œλ₯Ό ν™œμš©ν•˜λ©΄ κ°€λŠ₯ν• λ“―ν•˜λ„€μš”

μΉ΄ν”„μΉ΄λŠ” 전솑보μž₯κΈ°λŠ₯이 μžˆκΈ°λ•Œλ¬Έμ—, μ†ŒλΉ„ν•˜λŠ” μͺ½μ—μ„œ κ΅¬κ°„λ§ŒνΌ 읽고 λ§ˆμ§€λ§‰ 읽은 μ €μž₯ꡬ간에 체크포인트 κ±Έκ³  λ‹€μŒ μ½μ„λ•Œ λ‹€μŒ μœ„μΉ˜μ—μ„œ 읽기가λŠ₯ν•˜λ©° , 주둜 μ†ŒλΉ„μžκ°€ 잘λͺ»λ˜μ—ˆμ„λ•Œ λ³΅κ΅¬μ²˜λ¦¬μ— μ‚¬μš©λ˜λ©° ,이 처리λ₯Ό ν† ν”½ 1둜 이벀트 μˆœμ°¨μ²˜λ¦¬μš©λ„λ‘œ ν™œμš©ν•˜κ³ 

생산이 νŠΉμ •κ°œμˆ˜ ν˜Ήμ€ νŠΉμ •νƒ€μ΄λ° λ•Œ 이것을 μ²˜λ¦¬ν•΄λ₯Ό ν† ν”½2둜 μƒμ‚°ν•œν›„ λ‚΄κ°€μ›ν•˜λŠ” 타이밍을 토픽에 μƒμ‚°μ„ν•˜μ—¬ κ΅¬λ…ν•˜λŠ” μͺ½μ—μ„œ ν† ν”½1의 κ΅¬κ°„μ²˜λ¦¬λ₯Ό ν•˜λ©΄ κ°€λŠ₯ν• κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€.

λ˜λŠ” ν† ν”½ ν•˜λ‚˜λͺ¨λ“œλ‘œ 더 λ‹¨μˆœν™”ν•˜λ©΄ 타이머가 주기적으둜 μ˜€ν”„μ…‹μ„ μ²΄ν¬ν•˜μ—¬ μ¦κ°€ν•œ κ΅¬κ°„μ²˜λ¦¬λ§Œ ν•˜λ©΄ λ κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€.

핡심은 μ‹€μ‹œκ°„ κ΅¬λ…μ΄μ•„λ‹ˆμ—¬λ„λ˜λ©°, μΉ΄ν”„μΉ΄κ°€ λ³΄κ΄€ν•˜λŠ” κΈ°κ°„λ§ŒνΌ μ›ν•˜λŠ” μ˜€ν”„μ…‹ ꡬ간 μ²˜λ¦¬κ°€λŠ₯ν•˜λ‹€λž€κ²ƒμž…λ‹ˆλ‹€.

μ°Έκ³  : OffSet을 κ΄€λ¦¬ν•˜κ³ , ꡬ간탐색기λŠ₯ μ‚¬μš©ν•˜μ—¬ 배치처리기처럼 κ΅¬ν˜„

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            

링크 : How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net - Stack Overflow

μ’‹μ•„μš” 4