keisoft 2025. 2. 12. 08:03
// Worker Manager에 상태 모델 추가
public class WorkerManagerStatus
{
    public int QueuedWorkCount { get; set; }
    public int ActiveWorkerCount { get; set; }
    public List<WorkItemStatus> ActiveWorks { get; set; }
}

public class WorkItemStatus
{
    public string RequestId { get; set; }
    public string ActionType { get; set; }
    public DateTime StartTime { get; set; }
}

// Worker Manager Service에 상태 추적 기능 추가
public interface IWorkerManagerService
{
    Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
    Task<WorkerManagerStatus> GetStatusAsync();
}

public class WorkerManagerService : IWorkerManagerService
{
    private readonly IWorkerClient _workerClient;
    private readonly WorkQueue _workQueue;
    private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
    private readonly SemaphoreSlim _workerSemaphore;
    private readonly ConcurrentDictionary<string, WorkItemStatus> _activeWorks;
    private const int MaxConcurrentWorkers = 3;

    public WorkerManagerService(IWorkerClient workerClient)
    {
        _workerClient = workerClient;
        _workQueue = new WorkQueue();
        _pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
        _workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
        _activeWorks = new ConcurrentDictionary<string, WorkItemStatus>();
    }

    public async Task<WorkerManagerStatus> GetStatusAsync()
    {
        return new WorkerManagerStatus
        {
            QueuedWorkCount = _workQueue.Count,
            ActiveWorkerCount = MaxConcurrentWorkers - _workerSemaphore.CurrentCount,
            ActiveWorks = _activeWorks.Values.ToList()
        };
    }

    private async Task ProcessWorkItemAsync(QueuedWork work)
    {
        var workStatus = new WorkItemStatus
        {
            RequestId = work.RequestId,
            ActionType = work.Request.ActionType,
            StartTime = DateTime.UtcNow
        };

        _activeWorks.TryAdd(work.RequestId, workStatus);

        try
        {
            var response = await _workerClient.ProcessAsync(work.Request);

            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetResult(response);
            }
        }
        catch (Exception ex)
        {
            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetException(ex);
            }
        }
        finally
        {
            _activeWorks.TryRemove(work.RequestId, out _);
            _workerSemaphore.Release();
        }
    }
}

// WorkQueue에 카운트 프로퍼티 추가
public class WorkQueue
{
    private readonly ConcurrentQueue<QueuedWork> _queue = new();

    public int Count => _queue.Count;

    public void Enqueue(QueuedWork work)
    {
        _queue.Enqueue(work);
    }

    public bool TryDequeue(out QueuedWork work)
    {
        return _queue.TryDequeue(out work);
    }
}

// Worker Manager Controller에 상태 조회 API 추가
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
    private readonly IWorkerManagerService _workerManagerService;

    public WorkerManagerController(IWorkerManagerService workerManagerService)
    {
        _workerManagerService = workerManagerService;
    }

    [HttpGet("status")]
    public async Task<IActionResult> GetStatus()
    {
        var status = await _workerManagerService.GetStatusAsync();
        return Ok(status);
    }

    [HttpPost("process")]
    public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
    {
        var result = await _workerManagerService.ProcessRequestAsync(request);
        return Ok(result);
    }
}

// Blazor 클라이언트에 모니터링 페이지 추가
@page "/monitor"
@inject IWorkerManagerService WorkerManagerService
@implements IDisposable

<h3>Worker 모니터링</h3>

<div class="card mb-4">
    <div class="card-body">
        <div class="row">
            <div class="col-md-6">
                <h5>대기 중인 작업</h5>
                <h2>@status?.QueuedWorkCount</h2>
            </div>
            <div class="col-md-6">
                <h5>실행 중인 작업</h5>
                <h2>@status?.ActiveWorkerCount</h2>
            </div>
        </div>
    </div>
</div>

@if (status?.ActiveWorks?.Any() == true)
{
    <div class="card">
        <div class="card-header">
            <h5>실행 중인 작업 목록</h5>
        </div>
        <div class="card-body">
            <table class="table">
                <thead>
                    <tr>
                        <th>요청 ID</th>
                        <th>작업 유형</th>
                        <th>시작 시간</th>
                        <th>경과 시간</th>
                    </tr>
                </thead>
                <tbody>
                    @foreach (var work in status.ActiveWorks)
                    {
                        <tr>
                            <td>@work.RequestId</td>
                            <td>@work.ActionType</td>
                            <td>@work.StartTime.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss")</td>
                            <td>@((DateTime.UtcNow - work.StartTime).ToString(@"hh\:mm\:ss"))</td>
                        </tr>
                    }
                </tbody>
            </table>
        </div>
    </div>
}

@code {
    private WorkerManagerStatus status;
    private Timer timer;

    protected override void OnInitialized()
    {
        timer = new Timer(async _ =>
        {
            status = await WorkerManagerService.GetStatusAsync();
            await InvokeAsync(StateHasChanged);
        }, null, 0, 1000);
    }

    public void Dispose()
    {
        timer?.Dispose();
    }
}

// 1. Blazor SSR Client (BlazorApp.Client)
public class WorkRequest
{
    public string ActionType { get; set; }  // "Sync" or "Async"
    public string Data { get; set; }
}

public class WorkResponse
{
    public bool Success { get; set; }
    public string Result { get; set; }
}

public interface IWorkerManagerService
{
    Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
}

public class WorkerManagerService : IWorkerManagerService
{
    private readonly HttpClient _httpClient;

    public WorkerManagerService(HttpClient httpClient)
    {
        _httpClient = httpClient;
    }

    public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
    {
        var response = await _httpClient.PostAsJsonAsync("api/workermanager/process", request);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadFromJsonAsync<WorkResponse>();
    }
}

// Blazor Page Example
@page "/"
@inject IWorkerManagerService WorkerManagerService

<button @onclick="ProcessSync">동기 작업 실행</button>
<button @onclick="ProcessAsync">비동기 작업 실행</button>

@code {
    private async Task ProcessSync()
    {
        var request = new WorkRequest
        {
            ActionType = "Sync",
            Data = "Some data"
        };
        var result = await WorkerManagerService.ProcessRequestAsync(request);
        // Handle result
    }

    private async Task ProcessAsync()
    {
        var request = new WorkRequest
        {
            ActionType = "Async",
            Data = "Some data"
        };
        var result = await WorkerManagerService.ProcessRequestAsync(request);
        // Handle result
    }
}

// 2. Worker Manager (WorkerManager.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
    private readonly IWorkerManagerService _workerManagerService;

    public WorkerManagerController(IWorkerManagerService workerManagerService)
    {
        _workerManagerService = workerManagerService;
    }

    [HttpPost("process")]
    public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
    {
        var result = await _workerManagerService.ProcessRequestAsync(request);
        return Ok(result);
    }
}

public class WorkerManagerService : IWorkerManagerService
{
    private readonly IWorkerClient _workerClient;
    private readonly WorkQueue _workQueue;
    private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
    private readonly SemaphoreSlim _workerSemaphore;
    private const int MaxConcurrentWorkers = 3;

    public WorkerManagerService(IWorkerClient workerClient)
    {
        _workerClient = workerClient;
        _workQueue = new WorkQueue();
        _pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
        _workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
    }

    public async Task<WorkResponse> ProcessRequestAsync(WorkRequest request)
    {
        var taskCompletionSource = new TaskCompletionSource<WorkResponse>();
        var requestId = Guid.NewGuid().ToString();

        lock (_pendingTasks)
        {
            _pendingTasks[requestId] = taskCompletionSource;
        }

        _workQueue.Enqueue(new QueuedWork
        {
            RequestId = requestId,
            Request = request
        });

        _ = ProcessQueueAsync();

        if (request.ActionType == "Async")
        {
            return new WorkResponse { Success = true, Result = "작업이 큐에 등록되었습니다." };
        }

        using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
        try
        {
            return await taskCompletionSource.Task.WaitAsync(cts.Token);
        }
        catch (OperationCanceledException)
        {
            throw new TimeoutException("Worker processing timed out");
        }
    }

    private async Task ProcessQueueAsync()
    {
        while (_workQueue.TryDequeue(out var work))
        {
            await _workerSemaphore.WaitAsync();

            try
            {
                _ = ProcessWorkItemAsync(work);
            }
            catch
            {
                _workerSemaphore.Release();
            }
        }
    }

    private async Task ProcessWorkItemAsync(QueuedWork work)
    {
        try
        {
            var response = await _workerClient.ProcessAsync(work.Request);

            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetResult(response);
            }
        }
        catch (Exception ex)
        {
            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetException(ex);
            }
        }
        finally
        {
            _workerSemaphore.Release();
        }
    }
}

// 3. Worker (Worker.API)
[ApiController]
[Route("api/[controller]")]
public class WorkerController : ControllerBase
{
    private readonly ICustomActionService _customActionService;
    private readonly IBackgroundTaskQueue _taskQueue;

    public WorkerController(
        ICustomActionService customActionService,
        IBackgroundTaskQueue taskQueue)
    {
        _customActionService = customActionService;
        _taskQueue = taskQueue;
    }

    [HttpPost("process")]
    public async Task<IActionResult> Process([FromBody] WorkRequest request)
    {
        if (request.ActionType == "Async")
        {
            await _taskQueue.QueueBackgroundWorkItemAsync(async token =>
            {
                await _customActionService.ExecuteActionAsync(request.Data);
            });

            return Ok(new WorkResponse
            {
                Success = true,
                Result = "작업이 백그라운드에서 실행됩니다."
            });
        }
        
        var result = await _customActionService.ExecuteActionAsync(request.Data);
        return Ok(new WorkResponse
        {
            Success = true,
            Result = result
        });
    }
}

// Custom Action Service
public interface ICustomActionService
{
    Task<string> ExecuteActionAsync(string data);
}

public class CustomActionService : ICustomActionService
{
    public async Task<string> ExecuteActionAsync(string data)
    {
        // 실제 작업 수행 로직
        await Task.Delay(1000); // 예시: 시간이 걸리는 작업
        return $"Processed: {data}";
    }
}

// Background Task Queue
public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);
    Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, Task>> _queue;

    public BackgroundTaskQueue(int capacity = 100)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem)
    {
        await _queue.Writer.WriteAsync(workItem);
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        return await _queue.Reader.ReadAsync(cancellationToken);
    }
}

// Background Service for processing async tasks
public class QueuedHostedService : BackgroundService
{
    private readonly IBackgroundTaskQueue _taskQueue;
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(
        IBackgroundTaskQueue taskQueue,
        ILogger<QueuedHostedService> logger)
    {
        _taskQueue = taskQueue;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = await _taskQueue.DequeueAsync(stoppingToken);

            try
            {
                await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred executing background work item.");
            }
        }
    }
}



// Worker Manager에 상태 모델 추가
public class WorkerManagerStatus
{
    public int QueuedWorkCount { get; set; }
    public int ActiveWorkerCount { get; set; }
    public List<WorkItemStatus> ActiveWorks { get; set; }
}

public class WorkItemStatus
{
    public string RequestId { get; set; }
    public string ActionType { get; set; }
    public DateTime StartTime { get; set; }
}

// Worker Manager Service에 상태 추적 기능 추가
public interface IWorkerManagerService
{
    Task<WorkResponse> ProcessRequestAsync(WorkRequest request);
    Task<WorkerManagerStatus> GetStatusAsync();
}

public class WorkerManagerService : IWorkerManagerService
{
    private readonly IWorkerClient _workerClient;
    private readonly WorkQueue _workQueue;
    private readonly Dictionary<string, TaskCompletionSource<WorkResponse>> _pendingTasks;
    private readonly SemaphoreSlim _workerSemaphore;
    private readonly ConcurrentDictionary<string, WorkItemStatus> _activeWorks;
    private const int MaxConcurrentWorkers = 3;

    public WorkerManagerService(IWorkerClient workerClient)
    {
        _workerClient = workerClient;
        _workQueue = new WorkQueue();
        _pendingTasks = new Dictionary<string, TaskCompletionSource<WorkResponse>>();
        _workerSemaphore = new SemaphoreSlim(MaxConcurrentWorkers);
        _activeWorks = new ConcurrentDictionary<string, WorkItemStatus>();
    }

    public async Task<WorkerManagerStatus> GetStatusAsync()
    {
        return new WorkerManagerStatus
        {
            QueuedWorkCount = _workQueue.Count,
            ActiveWorkerCount = MaxConcurrentWorkers - _workerSemaphore.CurrentCount,
            ActiveWorks = _activeWorks.Values.ToList()
        };
    }

    private async Task ProcessWorkItemAsync(QueuedWork work)
    {
        var workStatus = new WorkItemStatus
        {
            RequestId = work.RequestId,
            ActionType = work.Request.ActionType,
            StartTime = DateTime.UtcNow
        };

        _activeWorks.TryAdd(work.RequestId, workStatus);

        try
        {
            var response = await _workerClient.ProcessAsync(work.Request);

            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetResult(response);
            }
        }
        catch (Exception ex)
        {
            if (work.Request.ActionType == "Sync")
            {
                TaskCompletionSource<WorkResponse> tcs;
                lock (_pendingTasks)
                {
                    if (_pendingTasks.TryGetValue(work.RequestId, out tcs))
                    {
                        _pendingTasks.Remove(work.RequestId);
                    }
                }

                tcs?.SetException(ex);
            }
        }
        finally
        {
            _activeWorks.TryRemove(work.RequestId, out _);
            _workerSemaphore.Release();
        }
    }
}

// WorkQueue에 카운트 프로퍼티 추가
public class WorkQueue
{
    private readonly ConcurrentQueue<QueuedWork> _queue = new();

    public int Count => _queue.Count;

    public void Enqueue(QueuedWork work)
    {
        _queue.Enqueue(work);
    }

    public bool TryDequeue(out QueuedWork work)
    {
        return _queue.TryDequeue(out work);
    }
}

// Worker Manager Controller에 상태 조회 API 추가
[ApiController]
[Route("api/[controller]")]
public class WorkerManagerController : ControllerBase
{
    private readonly IWorkerManagerService _workerManagerService;

    public WorkerManagerController(IWorkerManagerService workerManagerService)
    {
        _workerManagerService = workerManagerService;
    }

    [HttpGet("status")]
    public async Task<IActionResult> GetStatus()
    {
        var status = await _workerManagerService.GetStatusAsync();
        return Ok(status);
    }

    [HttpPost("process")]
    public async Task<IActionResult> ProcessRequest([FromBody] WorkRequest request)
    {
        var result = await _workerManagerService.ProcessRequestAsync(request);
        return Ok(result);
    }
}

// Blazor 클라이언트에 모니터링 페이지 추가
@page "/monitor"
@inject IWorkerManagerService WorkerManagerService
@implements IDisposable

<h3>Worker 모니터링</h3>

<div class="card mb-4">
    <div class="card-body">
        <div class="row">
            <div class="col-md-6">
                <h5>대기 중인 작업</h5>
                <h2>@status?.QueuedWorkCount</h2>
            </div>
            <div class="col-md-6">
                <h5>실행 중인 작업</h5>
                <h2>@status?.ActiveWorkerCount</h2>
            </div>
        </div>
    </div>
</div>

@if (status?.ActiveWorks?.Any() == true)
{
    <div class="card">
        <div class="card-header">
            <h5>실행 중인 작업 목록</h5>
        </div>
        <div class="card-body">
            <table class="table">
                <thead>
                    <tr>
                        <th>요청 ID</th>
                        <th>작업 유형</th>
                        <th>시작 시간</th>
                        <th>경과 시간</th>
                    </tr>
                </thead>
                <tbody>
                    @foreach (var work in status.ActiveWorks)
                    {
                        <tr>
                            <td>@work.RequestId</td>
                            <td>@work.ActionType</td>
                            <td>@work.StartTime.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss")</td>
                            <td>@((DateTime.UtcNow - work.StartTime).ToString(@"hh\:mm\:ss"))</td>
                        </tr>
                    }
                </tbody>
            </table>
        </div>
    </div>
}

@code {
    private WorkerManagerStatus status;
    private Timer timer;

    protected override void OnInitialized()
    {
        timer = new Timer(async _ =>
        {
            status = await WorkerManagerService.GetStatusAsync();
            await InvokeAsync(StateHasChanged);
        }, null, 0, 1000);
    }

    public void Dispose()
    {
        timer?.Dispose();
    }
}