병렬처리 질문 드립니다 !

약 30만개 정도 되는 CSV 파일을 Datatable 에 정리하려고 합니다.

한개의 csv 파일에는 여러개의 row data 가 있고 이를 한개의 row data 로 합친다음
생성한 dt에 적재하는 방식 입니다.

 Parallel.ForEach(File_path_lists, (File_name) =>
        {
            DataRow dr = dt.NewRow(); 
            Data_lists = Read_csv(File_name); 

            dr["Path"] = Data_lists[0];
            dr["Open"] = Data_lists[1];
            dr["High"] = Data_lists[2];
            dr["Low"] = Data_lists[3];
            dr["Close"] = Data_lists[4];
            dr["Volume"] = Data_lists[5];
            dr["OpenInt"] = Data_lists[6];
            dt.Rows.Add(dr);
         }

해당 코드에서 발생된 문제는…

  1. ‘DataTable internal index is corrupted’ 오류 발생
  2. 중복된 row data가 적재되는 현상

입니다.

해당 오류를 검색해보니 병렬처리 자체가 datatable 에는 매우 불안정하다 라고 하던데…
해결하기 위해서 lock 을 사용했지만 그냥 foreach 하는거랑 시간적으로 크게 차이는 없는것으로
확인했습니다.

혹시 다른방법이 있을까요 ?

2개의 좋아요

Microsoft Learn 사이트의 DataTable 클래스에 대한 스레스 안정성은 아래와 같이 설명되어 있습니다.

스레드 보안

이 형식은 다중 스레드 읽기 작업에 안전합니다. 모든 쓰기 작업을 동기화해야 합니다.
DataTable 클래스 (System.Data) | Microsoft Learn

@js_choi 님께서 언급하신

병렬처리 자체가 datatable 에는 매우 불안정하다

라고 보기 보다는 DataTable 클래스는 쓰기 작업에 대한 스레드 안전장치를 제공하지 있지 않으므로 병렬작업을 하려면 무조건 동기화를 하라는 말입니다.

그리고 현재 시나리오에서의 병렬 처리는 어떤 부분에 대한 동시성을 가져갈 것인지 좀 더 고민 해볼 필요가 있을 것 같습니다. 일반적으로 파일 I/O에 대한 병렬처리는 기대만큼 이득이 없을 수도 있습니다. 단위 작업 구간에 적절히 Producer-Consumer 패턴을 적용해야 할 것입니다. 이 부분에 대해서는 다음 분께서 답변 부탁드립니다ㅎ

또한 단순히 읽는 시간을 단축하기위해서 병렬처리를 고려하시는 것이라면 30만개 파일을 개별 파일을 합쳐서 처리하는 방향도 고려할 수 있습니다. 30만개 파일 내 내용이 각각 100바이트이고 그 합이 30메가바이트라고 할 때, 100바이트 파일 30만개를 처리하는 것 보다 30메가바이트 파일 1개를 처리하는 것이 훨씬 유리합니다.

5개의 좋아요

답변 감사합니다! 쓰기에서 동기화 한번 시도해보겠습니다
말씀하신대로 개별파일 합치는 방법도 구현해보겠습니다 !!

1개의 좋아요

병렬처리 할 때 항상 고려해야할 것 중 하나가 데이터를 병렬로 처리하고 한곳에 모은다거나 할 때
사용하고자 하시는 자료구조가 병렬처리에 대한 동기화를 지원하는지 항상 확인하셔야 합니다

문제의 원인은 @al6uiz 님께서 자세히 설명해주셨지만
dt.Rows.Add(dr); 구문에서 여러개의 쓰레드가 동시에 Add()를 하면서 예상할 수 없는 문제가 발생하는 겁니다.
(근데 왜 같은 데이터가 적재되는진 잘 모르겠네요… 여러번 호출하시나용? ㅋㅋㅋ)

dt.Rows.Add()의 코드를 확인해보셔도 동기화에 대한 내용이 없는 것을 확인하실 수 있습니다.
(Reference Source)

이럴 경우 말씀하신 lock등을 사용해서 동기화를 해줘야하는데…
lock등의 동기화를 사용하면 코드가 2~3배 느려지는게 아니고 10~20배 정도 느려진다고 하던데 참 딜레마입니다.
이렇게 되면 빠르게 처리하려고 병렬처리를 도입했던 이유가 사라지니 말입니다.
모든 쓰레드를 동기화하는 lock을 사용하지 않고 lock을 잡지 않는 lockfree의 개념도 있으니 관심있으면 확인해보시면 될 것 같습니다.

참고로 동기화가 적용된 queue인 ConcurrentQueue가 Enqueue하는 방식은 다음과 같습니다
(Reference Source)

2개의 좋아요