worker
각 프로젝트의 Program.cs에 필요한 서비스 등록은 다음과 같습니다:
1. Blazor SSR Client (BlazorApp.Client):
```csharp
builder.Services.AddHttpClient<IWorkerManagerService, WorkerManagerService>(client =>
{
client.BaseAddress = new Uri("https://worker-manager-api/");
});
```
2. Worker Manager (WorkerManager.API):
```csharp
builder.Services.AddHttpClient<IWorkerClient, WorkerClient>(client =>
{
client.BaseAddress = new Uri("https://worker-api/");
});
builder.Services.AddScoped<IWorkerManagerService, WorkerManagerService>();
builder.Services.AddScoped<IWorkerClient, WorkerClient>();
```
3. Worker (Worker.API):
```csharp
builder.Services.AddScoped<ICustomActionService, CustomActionService>();
builder.Services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
builder.Services.AddHostedService<QueuedHostedService>();
```
주요 특징:
1. Worker Manager는 요청을 큐에 저장하고 동시성을 제어합니다.
2. Worker는 동기/비동기 두 가지 방식으로 작업을 처리할 수 있습니다:
- 동기(Sync): 작업을 즉시 실행하고 결과를 반환
- 비동기(Async): 백그라운드 큐에 작업을 등록하고 즉시 응답
필요한 추가 기능이나 수정사항이 있으시다면 말씀해 주세요.
// 1. Blazor SSR Client (BlazorApp.Client)
public class WorkRequest
{
public string ActionType { get; set; } // "Sync" or "Async"
public string Data { get; set; }
}
public class WorkResponse
{
public bool Success { get; set; }
public string Result { get; set; }
}
public interface IWorkerManagerService
{
Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly HttpClient _httpClient;
public WorkerManagerService(HttpClient httpClient)
{
_httpClient = httpClient;
}
public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
{
var response = await _httpClient.PostAsJsonAsync("api/workermanager/process", request);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<WorkResponse>();
}
}
// Blazor Page Example
@page "/"
@inject IWorkerManagerService WorkerManagerService
<button @onclick="ProcessSync">동기 작업 실행</button>
<button @onclick="ProcessAsync">비동기 작업 실행</button>
@code {
private async Task ProcessSync()
{
var request = new WorkRequest
{
ActionType = "Sync",
Data = "Some data"
};
var result = await WorkerManagerService.ProcessRequestAsync(request);
// Handle result
}
private async Task ProcessAsync()
{
var request = new WorkRequest
{
ActionType = "Async",
Data = "Some data"
};
var result = await WorkerManagerService.ProcessRequestAsync(request);
// Handle result
}
}
// 2. Worker Manager (WorkerManager.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
private readonly IWorkerManagerService _workerManagerService;
public WorkerManagerController(IWorkerManagerService workerManagerService)
{
_workerManagerService = workerManagerService;
}
[HttpPost("process")]
public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
{
var result = await _workerManagerService.ProcessRequestAsync(request);
return Ok(result);
}
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly IWorkerClient _workerClient;
private readonly WorkQueue _workQueue;
private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
private readonly SemaphoreSlim _workerSemaphore;
private const int MaxConcurrentWorkers = 3;
public WorkerManagerService(IWorkerClient workerClient)
{
_workerClient = workerClient;
_workQueue = new WorkQueue();
_pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
_workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
}
public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
{
var taskCompletionSource = new TaskCompletionSource<WorkResponse>();
var requestId = Guid.NewGuid().ToString();
lock (_pendingTasks)
{
_pendingTasks[requestId] = taskCompletionSource;
}
_workQueue.Enqueue(new QueuedWork
{
RequestId = requestId,
Request = request
});
_ = ProcessQueueAsync();
if (request.ActionType == "Async")
{
return new WorkResponse { Success = true, Result = "작업이 큐에 등록되었습니다." };
}
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
return await taskCompletionSource.Task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException("Worker processing timed out");
}
}
private async Task ProcessQueueAsync()
{
while (_workQueue.TryDequeue(out var work))
{
await _workerSemaphore.WaitAsync();
try
{
_ = ProcessWorkItemAsync(work);
}
catch
{
_workerSemaphore.Release();
}
}
}
private async Task ProcessWorkItemAsync(QueuedWork work)
{
try
{
var response = await _workerClient.ProcessAsync(work.Request);
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetResult(response);
}
}
catch (Exception ex)
{
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetException(ex);
}
}
finally
{
_workerSemaphore.Release();
}
}
}
// 3. Worker (Worker.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerController : ControllerBase
{
private readonly ICustomActionService _customActionService;
private readonly IBackgroundTaskQueue _taskQueue;
public WorkerController(
ICustomActionService customActionService,
IBackgroundTaskQueue taskQueue)
{
_customActionService = customActionService;
_taskQueue = taskQueue;
}
[HttpPost("process")]
public async Task<IActionResult> Process([FromBody] WorkRequest request)
{
if (request.ActionType == "Async")
{
await _taskQueue.QueueBackgroundWorkItemAsync(async token =>
{
await _customActionService.ExecuteActionAsync(request.Data);
});
return Ok(new WorkResponse
{
Success = true,
Result = "작업이 백그라운드에서 실행됩니다."
});
}
var result = await _customActionService.ExecuteActionAsync(request.Data);
return Ok(new WorkResponse
{
Success = true,
Result = result
});
}
}
// Custom Action Service
public interface ICustomActionService
{
Task<string> ExecuteActionAsync(string data);
}
public class CustomActionService : ICustomActionService
{
public async Task<string> ExecuteActionAsync(string data)
{
// 실제 작업 수행 로직
await Task.Delay(1000); // 예시: 시간이 걸리는 작업
return $"Processed: {data}";
}
}
// Background Task Queue
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue;
public BackgroundTaskQueue(int capacity = 100)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem)
{
await _queue.Writer.WriteAsync(workItem);
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
return await _queue.Reader.ReadAsync(cancellationToken);
}
}
// Background Service for processing async tasks
public class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await _taskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing background work item.");
}
}
}
}