List의 항목들을 특정 카운트 기준으로 나누고 병렬로 작업하는 방법
C#에서 List의 항목들을 특정 카운트 기준으로 나누고 병렬로 작업하는 방법은 Parallel.ForEach나 Task와 같은 멀티스레딩을 활용하여 작업을 병렬 처리할 수 있습니다. 주로 작업을 나누기 위해 Chunk 또는 Partitioner를 사용할 수 있습니다. 아래는 그 예시입니다.
1. Parallel.ForEach와 Partitioner 사용
Partitioner를 사용하면 리스트를 일정한 크기로 나눈 후 병렬로 작업할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
List<int> items = new List<int>();
for (int i = 0; i < 100; i++) items.Add(i);
int chunkSize = 10;
// Partition the list into chunks of the specified size
var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering)
.GetPartitions(items.Count / chunkSize);
// Parallel processing with partitioned data
Parallel.ForEach(partitioner, partition =>
{
using (partition)
{
while (partition.MoveNext())
{
ProcessItem(partition.Current);
}
}
});
}
static void ProcessItem(int item)
{
// Simulate some work
Console.WriteLine($"Processing item {item}");
}
}
2. List를 Chunk로 나누고 Parallel.ForEach로 처리
.NET 6 이상에서는 Chunk 메서드를 사용해 리스트를 일정 크기로 나눈 후 병렬로 처리할 수 있습니다.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
List<int> items = Enumerable.Range(1, 100).ToList();
int chunkSize = 10;
// List를 chunk로 나누고 병렬로 처리
Parallel.ForEach(items.Chunk(chunkSize), chunk =>
{
foreach (var item in chunk)
{
ProcessItem(item);
}
});
}
static void ProcessItem(int item)
{
// 작업 처리
Console.WriteLine($"Processing item {item}");
}
}
3. Task를 사용한 병렬 작업
Task를 이용하여 항목을 나누어 병렬 작업을 수행할 수도 있습니다.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
List<int> items = Enumerable.Range(1, 100).ToList();
int chunkSize = 10;
// List를 chunk로 나눈 후 각각을 Task로 병렬 처리
var tasks = items
.Chunk(chunkSize)
.Select(chunk => Task.Run(() =>
{
foreach (var item in chunk)
{
ProcessItem(item);
}
})).ToArray();
// 모든 Task가 완료될 때까지 대기
await Task.WhenAll(tasks);
}
static void ProcessItem(int item)
{
// 작업 처리
Console.WriteLine($"Processing item {item}");
}
}
이 예제들은 리스트의 항목들을 특정 크기(chunk)로 나누어 병렬로 처리하는 방법을 보여줍니다. 이 방식으로 많은 데이터를 나누어 효율적으로 처리할 수 있습니다.
Partitioner를 사용하여 병렬 작업을 진행하는 중에 작업을 취소하거나 순번을 처리하는 방법은 다음과 같이 구현할 수 있습니다.
1. 작업 취소 기능 구현 (CancellationToken 사용)
C#에서 작업을 취소하려면 CancellationToken을 사용해야 합니다. Parallel.ForEach나 Task에서 작업 도중 CancellationToken이 설정되면 중간에 작업을 취소할 수 있습니다.
2. 작업 순번 처리
병렬 작업을 수행할 때 순번을 유지하려면 Partitioner 또는 Parallel.ForEach에서 인덱스 정보를 활용할 수 있습니다. 이를 통해 각 작업의 순번을 추적할 수 있습니다.
다음은 CancellationToken과 작업 순번을 함께 처리하는 방법의 예시입니다.
Parallel.ForEach와 Partitioner를 사용한 작업 취소 및 순번 처리
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
List<int> items = Enumerable.Range(1, 100).ToList();
int chunkSize = 10;
// 작업 취소를 위한 CancellationTokenSource
CancellationTokenSource cts = new CancellationTokenSource();
// 사용자 입력을 통해 작업 취소
Task.Run(() =>
{
Console.WriteLine("Press Enter to cancel the operation.");
Console.ReadLine();
cts.Cancel(); // 취소 신호 전송
});
try
{
// Partition을 통해 리스트를 나누고 병렬 작업 수행
var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering)
.GetPartitions(items.Count / chunkSize);
Parallel.ForEach(partitioner, (partition, state) =>
{
using (partition)
{
while (partition.MoveNext())
{
// 작업이 취소되었는지 확인
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Operation was cancelled.");
state.Stop(); // 병렬 작업을 중단
break;
}
// 순번 처리를 위해 인덱스 계산
int index = items.IndexOf(partition.Current);
ProcessItem(partition.Current, index);
}
}
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Task was cancelled.");
}
}
static void ProcessItem(int item, int index)
{
// 순번과 함께 작업 처리
Console.WriteLine($"Processing item {item} at index {index}");
// 작업 처리 시뮬레이션
Thread.Sleep(100); // 처리 시간을 시뮬레이션
}
}
주요 구현 내용:
1. 작업 취소 기능 (CancellationTokenSource)
CancellationTokenSource를 사용해 작업 도중 취소할 수 있도록 처리합니다.
Parallel.ForEach 안에서 CancellationToken을 확인하고, 작업이 취소된 경우 state.Stop()을 호출하여 병렬 작업을 중단합니다.
2. 작업 순번 처리
items.IndexOf(partition.Current)를 사용하여 현재 처리 중인 항목의 순번을 계산합니다.
ProcessItem 메서드에서 순번을 함께 출력하여 작업 순서를 확인할 수 있습니다.
3. Task와 Chunk를 활용한 방법
또한, Task와 Chunk를 사용한 방식에서도 작업 취소와 순번 처리를 적용할 수 있습니다.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
List<int> items = Enumerable.Range(1, 100).ToList();
int chunkSize = 10;
// 작업 취소를 위한 CancellationTokenSource
CancellationTokenSource cts = new CancellationTokenSource();
// 사용자 입력을 통해 작업 취소
Task.Run(() =>
{
Console.WriteLine("Press Enter to cancel the operation.");
Console.ReadLine();
cts.Cancel(); // 취소 신호 전송
});
try
{
// 병렬로 Task를 실행하면서 작업 순번을 포함
var tasks = items
.Select((item, index) => new { item, index })
.Chunk(chunkSize)
.Select(chunk => Task.Run(() =>
{
foreach (var entry in chunk)
{
// 작업이 취소되었는지 확인
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Operation was cancelled.");
break; // 작업 중단
}
// 순번과 함께 작업 처리
ProcessItem(entry.item, entry.index);
}
}, cts.Token)).ToArray();
// 모든 Task가 완료될 때까지 대기
await Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
Console.WriteLine("Task was cancelled.");
}
}
static void ProcessItem(int item, int index)
{
// 순번과 함께 작업 처리
Console.WriteLine($"Processing item {item} at index {index}");
// 작업 처리 시뮬레이션
Thread.Sleep(100); // 처리 시간을 시뮬레이션
}
}
요약
작업 취소: CancellationToken을 사용하여 병렬 작업을 취소할 수 있으며, Parallel.ForEach나 Task에서 취소 요청을 확인하여 중단합니다.
작업 순번 처리: 리스트에서 IndexOf 또는 인덱스 계산을 통해 각 항목의 순번을 추적할 수 있습니다.