worker2
// Worker Manager에 상태 모델 추가
public class WorkerManagerStatus
{
public int QueuedWorkCount { get; set; }
public int ActiveWorkerCount { get; set; }
public List<WorkItemStatus> ActiveWorks { get; set; }
}
public class WorkItemStatus
{
public string RequestId { get; set; }
public string ActionType { get; set; }
public DateTime StartTime { get; set; }
}
// Worker Manager Service에 상태 추적 기능 추가
public interface IWorkerManagerService
{
Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
Task<WorkerManagerStatus> GetStatusAsync();
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly IWorkerClient _workerClient;
private readonly WorkQueue _workQueue;
private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
private readonly SemaphoreSlim _workerSemaphore;
private readonly ConcurrentDictionary<string, WorkItemStatus> _activeWorks;
private const int MaxConcurrentWorkers = 3;
public WorkerManagerService(IWorkerClient workerClient)
{
_workerClient = workerClient;
_workQueue = new WorkQueue();
_pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
_workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
_activeWorks = new ConcurrentDictionary<string, WorkItemStatus>();
}
public async Task<WorkerManagerStatus> GetStatusAsync()
{
return new WorkerManagerStatus
{
QueuedWorkCount = _workQueue.Count,
ActiveWorkerCount = MaxConcurrentWorkers - _workerSemaphore.CurrentCount,
ActiveWorks = _activeWorks.Values.ToList()
};
}
private async Task ProcessWorkItemAsync(QueuedWork work)
{
var workStatus = new WorkItemStatus
{
RequestId = work.RequestId,
ActionType = work.Request.ActionType,
StartTime = DateTime.UtcNow
};
_activeWorks.TryAdd(work.RequestId, workStatus);
try
{
var response = await _workerClient.ProcessAsync(work.Request);
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetResult(response);
}
}
catch (Exception ex)
{
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetException(ex);
}
}
finally
{
_activeWorks.TryRemove(work.RequestId, out _);
_workerSemaphore.Release();
}
}
}
// WorkQueue에 카운트 프로퍼티 추가
public class WorkQueue
{
private readonly ConcurrentQueue<QueuedWork> _queue = new();
public int Count => _queue.Count;
public void Enqueue(QueuedWork work)
{
_queue.Enqueue(work);
}
public bool TryDequeue(out QueuedWork work)
{
return _queue.TryDequeue(out work);
}
}
// Worker Manager Controller에 상태 조회 API 추가
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
private readonly IWorkerManagerService _workerManagerService;
public WorkerManagerController(IWorkerManagerService workerManagerService)
{
_workerManagerService = workerManagerService;
}
[HttpGet("status")]
public async Task<IActionResult> GetStatus()
{
var status = await _workerManagerService.GetStatusAsync();
return Ok(status);
}
[HttpPost("process")]
public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
{
var result = await _workerManagerService.ProcessRequestAsync(request);
return Ok(result);
}
}
// Blazor 클라이언트에 모니터링 페이지 추가
@page "/monitor"
@inject IWorkerManagerService WorkerManagerService
@implements IDisposable
<h3>Worker 모니터링</h3>
<div class="card mb-4">
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h5>대기 중인 작업</h5>
<h2>@status?.QueuedWorkCount</h2>
</div>
<div class="col-md-6">
<h5>실행 중인 작업</h5>
<h2>@status?.ActiveWorkerCount</h2>
</div>
</div>
</div>
</div>
@if (status?.ActiveWorks?.Any() == true)
{
<div class="card">
<div class="card-header">
<h5>실행 중인 작업 목록</h5>
</div>
<div class="card-body">
<table class="table">
<thead>
<tr>
<th>요청 ID</th>
<th>작업 유형</th>
<th>시작 시간</th>
<th>경과 시간</th>
</tr>
</thead>
<tbody>
@foreach (var work in status.ActiveWorks)
{
<tr>
<td>@work.RequestId</td>
<td>@work.ActionType</td>
<td>@work.StartTime.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss")</td>
<td>@((DateTime.UtcNow - work.StartTime).ToString(@"hh\:mm\:ss"))</td>
</tr>
}
</tbody>
</table>
</div>
</div>
}
@code {
private WorkerManagerStatus status;
private Timer timer;
protected override void OnInitialized()
{
timer = new Timer(async _ =>
{
status = await WorkerManagerService.GetStatusAsync();
await InvokeAsync(StateHasChanged);
}, null, 0, 1000);
}
public void Dispose()
{
timer?.Dispose();
}
}
// 1. Blazor SSR Client (BlazorApp.Client)
public class WorkRequest
{
public string ActionType { get; set; } // "Sync" or "Async"
public string Data { get; set; }
}
public class WorkResponse
{
public bool Success { get; set; }
public string Result { get; set; }
}
public interface IWorkerManagerService
{
Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly HttpClient _httpClient;
public WorkerManagerService(HttpClient httpClient)
{
_httpClient = httpClient;
}
public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
{
var response = await _httpClient.PostAsJsonAsync("api/workermanager/process", request);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<WorkResponse>();
}
}
// Blazor Page Example
@page "/"
@inject IWorkerManagerService WorkerManagerService
<button @onclick="ProcessSync">동기 작업 실행</button>
<button @onclick="ProcessAsync">비동기 작업 실행</button>
@code {
private async Task ProcessSync()
{
var request = new WorkRequest
{
ActionType = "Sync",
Data = "Some data"
};
var result = await WorkerManagerService.ProcessRequestAsync(request);
// Handle result
}
private async Task ProcessAsync()
{
var request = new WorkRequest
{
ActionType = "Async",
Data = "Some data"
};
var result = await WorkerManagerService.ProcessRequestAsync(request);
// Handle result
}
}
// 2. Worker Manager (WorkerManager.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
private readonly IWorkerManagerService _workerManagerService;
public WorkerManagerController(IWorkerManagerService workerManagerService)
{
_workerManagerService = workerManagerService;
}
[HttpPost("process")]
public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
{
var result = await _workerManagerService.ProcessRequestAsync(request);
return Ok(result);
}
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly IWorkerClient _workerClient;
private readonly WorkQueue _workQueue;
private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
private readonly SemaphoreSlim _workerSemaphore;
private const int MaxConcurrentWorkers = 3;
public WorkerManagerService(IWorkerClient workerClient)
{
_workerClient = workerClient;
_workQueue = new WorkQueue();
_pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
_workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
}
public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
{
var taskCompletionSource = new TaskCompletionSource<WorkResponse>();
var requestId = Guid.NewGuid().ToString();
lock (_pendingTasks)
{
_pendingTasks[requestId] = taskCompletionSource;
}
_workQueue.Enqueue(new QueuedWork
{
RequestId = requestId,
Request = request
});
_ = ProcessQueueAsync();
if (request.ActionType == "Async")
{
return new WorkResponse { Success = true, Result = "작업이 큐에 등록되었습니다." };
}
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
return await taskCompletionSource.Task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException("Worker processing timed out");
}
}
private async Task ProcessQueueAsync()
{
while (_workQueue.TryDequeue(out var work))
{
await _workerSemaphore.WaitAsync();
try
{
_ = ProcessWorkItemAsync(work);
}
catch
{
_workerSemaphore.Release();
}
}
}
private async Task ProcessWorkItemAsync(QueuedWork work)
{
try
{
var response = await _workerClient.ProcessAsync(work.Request);
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetResult(response);
}
}
catch (Exception ex)
{
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetException(ex);
}
}
finally
{
_workerSemaphore.Release();
}
}
}
// 3. Worker (Worker.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerController : ControllerBase
{
private readonly ICustomActionService _customActionService;
private readonly IBackgroundTaskQueue _taskQueue;
public WorkerController(
ICustomActionService customActionService,
IBackgroundTaskQueue taskQueue)
{
_customActionService = customActionService;
_taskQueue = taskQueue;
}
[HttpPost("process")]
public async Task<IActionResult> Process([FromBody] WorkRequest request)
{
if (request.ActionType == "Async")
{
await _taskQueue.QueueBackgroundWorkItemAsync(async token =>
{
await _customActionService.ExecuteActionAsync(request.Data);
});
return Ok(new WorkResponse
{
Success = true,
Result = "작업이 백그라운드에서 실행됩니다."
});
}
var result = await _customActionService.ExecuteActionAsync(request.Data);
return Ok(new WorkResponse
{
Success = true,
Result = result
});
}
}
// Custom Action Service
public interface ICustomActionService
{
Task<string> ExecuteActionAsync(string data);
}
public class CustomActionService : ICustomActionService
{
public async Task<string> ExecuteActionAsync(string data)
{
// 실제 작업 수행 로직
await Task.Delay(1000); // 예시: 시간이 걸리는 작업
return $"Processed: {data}";
}
}
// Background Task Queue
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue;
public BackgroundTaskQueue(int capacity = 100)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem)
{
await _queue.Writer.WriteAsync(workItem);
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
return await _queue.Reader.ReadAsync(cancellationToken);
}
}
// Background Service for processing async tasks
public class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await _taskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing background work item.");
}
}
}
}
// Worker Manager에 상태 모델 추가
public class WorkerManagerStatus
{
public int QueuedWorkCount { get; set; }
public int ActiveWorkerCount { get; set; }
public List<WorkItemStatus> ActiveWorks { get; set; }
}
public class WorkItemStatus
{
public string RequestId { get; set; }
public string ActionType { get; set; }
public DateTime StartTime { get; set; }
}
// Worker Manager Service에 상태 추적 기능 추가
public interface IWorkerManagerService
{
Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
Task<WorkerManagerStatus> GetStatusAsync();
}
public class WorkerManagerService : IWorkerManagerService
{
private readonly IWorkerClient _workerClient;
private readonly WorkQueue _workQueue;
private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
private readonly SemaphoreSlim _workerSemaphore;
private readonly ConcurrentDictionary<string, WorkItemStatus> _activeWorks;
private const int MaxConcurrentWorkers = 3;
public WorkerManagerService(IWorkerClient workerClient)
{
_workerClient = workerClient;
_workQueue = new WorkQueue();
_pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
_workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
_activeWorks = new ConcurrentDictionary<string, WorkItemStatus>();
}
public async Task<WorkerManagerStatus> GetStatusAsync()
{
return new WorkerManagerStatus
{
QueuedWorkCount = _workQueue.Count,
ActiveWorkerCount = MaxConcurrentWorkers - _workerSemaphore.CurrentCount,
ActiveWorks = _activeWorks.Values.ToList()
};
}
private async Task ProcessWorkItemAsync(QueuedWork work)
{
var workStatus = new WorkItemStatus
{
RequestId = work.RequestId,
ActionType = work.Request.ActionType,
StartTime = DateTime.UtcNow
};
_activeWorks.TryAdd(work.RequestId, workStatus);
try
{
var response = await _workerClient.ProcessAsync(work.Request);
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetResult(response);
}
}
catch (Exception ex)
{
if (work.Request.ActionType == "Sync")
{
TaskCompletionSource<WorkResponse> tcs;
lock (_pendingTasks)
{
if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
{
_pendingTasks.Remove(work.RequestId);
}
}
tcs?.SetException(ex);
}
}
finally
{
_activeWorks.TryRemove(work.RequestId, out _);
_workerSemaphore.Release();
}
}
}
// WorkQueue에 카운트 프로퍼티 추가
public class WorkQueue
{
private readonly ConcurrentQueue<QueuedWork> _queue = new();
public int Count => _queue.Count;
public void Enqueue(QueuedWork work)
{
_queue.Enqueue(work);
}
public bool TryDequeue(out QueuedWork work)
{
return _queue.TryDequeue(out work);
}
}
// Worker Manager Controller에 상태 조회 API 추가
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
private readonly IWorkerManagerService _workerManagerService;
public WorkerManagerController(IWorkerManagerService workerManagerService)
{
_workerManagerService = workerManagerService;
}
[HttpGet("status")]
public async Task<IActionResult> GetStatus()
{
var status = await _workerManagerService.GetStatusAsync();
return Ok(status);
}
[HttpPost("process")]
public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
{
var result = await _workerManagerService.ProcessRequestAsync(request);
return Ok(result);
}
}
// Blazor 클라이언트에 모니터링 페이지 추가
@page "/monitor"
@inject IWorkerManagerService WorkerManagerService
@implements IDisposable
<h3>Worker 모니터링</h3>
<div class="card mb-4">
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h5>대기 중인 작업</h5>
<h2>@status?.QueuedWorkCount</h2>
</div>
<div class="col-md-6">
<h5>실행 중인 작업</h5>
<h2>@status?.ActiveWorkerCount</h2>
</div>
</div>
</div>
</div>
@if (status?.ActiveWorks?.Any() == true)
{
<div class="card">
<div class="card-header">
<h5>실행 중인 작업 목록</h5>
</div>
<div class="card-body">
<table class="table">
<thead>
<tr>
<th>요청 ID</th>
<th>작업 유형</th>
<th>시작 시간</th>
<th>경과 시간</th>
</tr>
</thead>
<tbody>
@foreach (var work in status.ActiveWorks)
{
<tr>
<td>@work.RequestId</td>
<td>@work.ActionType</td>
<td>@work.StartTime.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss")</td>
<td>@((DateTime.UtcNow - work.StartTime).ToString(@"hh\:mm\:ss"))</td>
</tr>
}
</tbody>
</table>
</div>
</div>
}
@code {
private WorkerManagerStatus status;
private Timer timer;
protected override void OnInitialized()
{
timer = new Timer(async _ =>
{
status = await WorkerManagerService.GetStatusAsync();
await InvokeAsync(StateHasChanged);
}, null, 0, 1000);
}
public void Dispose()
{
timer?.Dispose();
}
}