작업 결과를 캐시하고 재사용하는 구현을 추가하겠습니다. IDistributedCache를 사용하여 분산 환경에서도 동작하도록 구성하겠습니다.
/// <summary>
/// 캐시된 작업 결과를 저장하는 클래스
/// </summary>
public class CachedWorkResult
{
public WorkResponse Response { get; set; }
public DateTime CachedTime { get; set; }
public string RequestHash { get; set; }
}
/// <summary>
/// 작업 요청에 대한 캐시 키를 생성하는 Helper 클래스
/// </summary>
public static class WorkRequestHashGenerator
{
public static string GenerateHash(WorkRequest request)
{
using (var sha256 = SHA256.Create())
{
var data = JsonSerializer.Serialize(new
{
request.ActionType,
request.Data,
Parameters = request.Parameters.OrderBy(x => x.Key)
});
var bytes = Encoding.UTF8.GetBytes(data);
var hash = sha256.ComputeHash(bytes);
return Convert.ToBase64String(hash);
}
}
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly IWorkerClient _workerClient;
private readonly WorkQueue _workQueue;
private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
private readonly SemaphoreSlim _workerSemaphore;
private readonly ConcurrentDictionary<string, WorkItemStatus> _activeWorks;
private readonly IDistributedCache _cache;
private readonly ILogger<WorkerManagerService> _logger;
private const int MaxConcurrentWorkers = 3;
public WorkerManagerService(
IWorkerClient workerClient,
IDistributedCache cache,
ILogger<WorkerManagerService> logger)
{
_workerClient = workerClient;
_cache = cache;
_logger = logger;
_workQueue = new WorkQueue();
_pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
_workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
_activeWorks = new ConcurrentDictionary<string, WorkItemStatus>();
}
public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request, WorkPriority priority = WorkPriority.Normal)
{
// 캐시 키 생성
var cacheKey = WorkRequestHashGenerator.GenerateHash(request);
// 캐시된 결과 확인
var cachedResult = await GetCachedResultAsync(cacheKey);
if (cachedResult != null)
{
_logger.LogInformation("Cache hit for request {CacheKey}", cacheKey);
return cachedResult.Response;
}
var taskCompletionSource = new TaskCompletionSource<WorkResponse>();
var requestId = Guid.NewGuid().ToString();
var queuedWork = new QueuedWork
{
RequestId = requestId,
Request = request,
Priority = priority,
EnqueueTime = DateTime.UtcNow,
RetryCount = 0,
CacheKey = cacheKey // 캐시 키 저장
};
if (request.ActionType == "Async")
{
_workQueue.Enqueue(queuedWork);
_ = ProcessQueueAsync();
return new WorkResponse
{
Success = true,
Result = $"작업이 큐에 등록되었습니다. RequestId: {requestId}",
ResponseTime = DateTime.UtcNow
};
}
lock (_pendingTasks)
{
_pendingTasks[requestId] = taskCompletionSource;
}
_workQueue.Enqueue(queuedWork);
_ = ProcessQueueAsync();
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
var response = await taskCompletionSource.Task.WaitAsync(cts.Token);
response.ResponseTime = DateTime.UtcNow;
// 성공적인 결과를 캐시에 저장
await CacheResultAsync(cacheKey, response);
return response;
}
catch (OperationCanceledException)
{
throw new TimeoutException("Worker processing timed out");
}
}
private async Task<CachedWorkResult> GetCachedResultAsync(string cacheKey)
{
try
{
var cachedData = await _cache.GetAsync(cacheKey);
if (cachedData == null) return null;
var cachedResult = JsonSerializer.Deserialize<CachedWorkResult>(cachedData);
// 캐시 만료 체크 (예: 1시간)
if (DateTime.UtcNow.Subtract(cachedResult.CachedTime).TotalHours > 1)
{
await _cache.RemoveAsync(cacheKey);
return null;
}
return cachedResult;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error retrieving cached result");
return null;
}
}
private async Task CacheResultAsync(string cacheKey, WorkResponse response)
{
try
{
var cacheResult = new CachedWorkResult
{
Response = response,
CachedTime = DateTime.UtcNow,
RequestHash = cacheKey
};
var options = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1), // 1시간 후 만료
SlidingExpiration = TimeSpan.FromMinutes(30) // 30분 동안 접근이 없으면 만료
};
var serializedData = JsonSerializer.Serialize(cacheResult);
await _cache.SetAsync(cacheKey, Encoding.UTF8.GetBytes(serializedData), options);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error caching result");
}
}
// 캐시 무효화를 위한 메서드
public async Task InvalidateCacheAsync(string cacheKey)
{
await _cache.RemoveAsync(cacheKey);
}
}
// Startup.cs 또는 Program.cs에 캐시 설정 추가
public void ConfigureServices(IServiceCollection services)
{
// Redis 캐시 설정
services.AddStackExchangeRedisCache(options =>
{
options.Configuration = Configuration.GetConnectionString("Redis");
options.InstanceName = "WorkerCache:";
});
// 또는 인메모리 캐시 설정 (개발 환경용)
// services.AddDistributedMemoryCache();
}
이 구현의 주요 특징은 다음과 같습니다:
1. **캐시 키 생성**
- 작업 요청의 내용을 기반으로 고유한 해시 생성
- ActionType, Data, Parameters를 포함하여 동일 작업 판별
2. **캐시 처리 로직**
- 작업 실행 전 캐시 확인
- 캐시 히트 시 즉시 결과 반환
- 캐시 미스 시 작업 실행 후 결과 캐싱
3. **캐시 설정**
- 절대 만료 시간: 1시간
- 슬라이딩 만료: 30분 (접근이 없을 경우)
- Redis 또는 인메모리 캐시 선택 가능
4. **캐시 무효화**
- 필요시 특정 캐시 항목 삭제 가능
- 캐시 만료 시간 초과시 자동 삭제
사용 예시:
```csharp
// 컨트롤러에서의 사용
[HttpPost("process")]
public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
{
var result = await _workerManagerService.ProcessRequestAsync(request);
return Ok(result);
}
// 캐시 무효화가 필요한 경우
[HttpPost("invalidate-cache")]
public async Task<IActionResult> InvalidateCache(string cacheKey)
{
await _workerManagerService.InvalidateCacheAsync(cacheKey);
return Ok();
}
```
이렇게 구현하면:
1. 동일한 작업 요청에 대해 캐시된 결과를 즉시 반환
2. 시스템 부하 감소
3. 응답 시간 개선
4. 분산 환경에서도 캐시 공유 가능
캐시 설정(만료 시간 등)은 요구사항에 따라 조정할 수 있습니다.
카테고리 없음