본문 바로가기
카테고리 없음

large-scale data streaming.

by keisoft 2024. 12. 9.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;

// 데이터 모델 정의
public class TestData
{
    public string Key { get; set; }
    public DataClass Data { get; set; }
}

public class DataClass
{
    public int ChartKey { get; set; }
    public List<ChartData> ChartDataList { get; set; }
}

public class ChartData
{
    public float X { get; set; }
    public float Y { get; set; }
}

// API 컨트롤러
[ApiController]
[Route("api/[controller]")]
public class DataController : ControllerBase
{
    // 방법 1: 대용량 데이터 스트리밍 (IAsyncEnumerable)
    [HttpPost("stream-large-data")]
    public async IAsyncEnumerable<ChartData> StreamLargeData()
    {
        // 200만건의 대용량 데이터 생성 예시
        for (int i = 0; i < 2_000_000; i++)
        {
            yield return new ChartData
            {
                X = i,
                Y = (float)Math.Sin(i * 0.01)
            };

            // 대용량 데이터 생성 중 잠시 대기 (선택적)
            await Task.Delay(1);
        }
    }

    // 방법 2: 청크 단위 데이터 전송
    [HttpPost("chunk-large-data")]
    public async Task<IActionResult> ChunkedDataTransfer()
    {
        // 응답 스트림 설정
        Response.Headers.Add("Content-Type", "application/json");
        Response.Headers.Add("Transfer-Encoding", "chunked");

        // JSON 직렬화 옵션 설정
        var options = new JsonSerializerOptions
        {
            WriteIndented = false,
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        };

        // 스트림 writer 준비
        await using var writer = new StreamWriter(Response.Body);
        await writer.WriteAsync("[");

        // 200만건 데이터 청크 단위로 전송
        bool isFirstItem = true;
        for (int i = 0; i < 2_000_000; i++)
        {
            var chartData = new ChartData
            {
                X = i,
                Y = (float)Math.Sin(i * 0.01)
            };

            // 첫 번째 아이템이 아니면 콤마 추가
            if (!isFirstItem)
            {
                await writer.WriteAsync(",");
            }
            isFirstItem = false;

            // JSON으로 직렬화하여 스트림에 직접 쓰기
            var jsonData = JsonSerializer.Serialize(chartData, options);
            await writer.WriteAsync(jsonData);
            await writer.FlushAsync();
        }

        await writer.WriteAsync("]");
        await writer.FlushAsync();

        return new EmptyResult();
    }

    // 방법 3: 페이징된 데이터 전송
    [HttpPost("paged-large-data")]
    public IActionResult PagedDataTransfer(
        [FromQuery] int pageSize = 10000, 
        [FromQuery] int pageNumber = 0)
    {
        // 전체 데이터 생성
        var allData = GenerateLargeDataSet();

        // 페이징 처리
        var pagedData = allData
            .Skip(pageNumber * pageSize)
            .Take(pageSize)
            .ToList();

        return Ok(new 
        {
            TotalCount = allData.Count,
            PageSize = pageSize,
            PageNumber = pageNumber,
            Data = pagedData
        });
    }

    // 대용량 데이터셋 생성 헬퍼 메서드
    private List<ChartData> GenerateLargeDataSet()
    {
        return Enumerable.Range(0, 2_000_000)
            .Select(i => new ChartData 
            { 
                X = i, 
                Y = (float)Math.Sin(i * 0.01) 
            })
            .ToList();
    }
}

// 클라이언트에서 사용할 수 있는 데이터 처리 예시 서비스
public class LargeDataService
{
    private readonly HttpClient _httpClient;

    public LargeDataService(HttpClient httpClient)
    {
        _httpClient = httpClient;
    }

    // 스트리밍 데이터 처리 메서드
    public async Task ProcessStreamingData()
    {
        var response = await _httpClient.PostAsync("/api/data/stream-large-data", null);
        response.EnsureSuccessStatusCode();

        // 스트림 처리 로직 구현
        await using var stream = await response.Content.ReadAsStreamAsync();
        using var reader = new StreamReader(stream);

        string line;
        while ((line = await reader.ReadLineAsync()) != null)
        {
            // 각 라인(데이터) 처리
            // 예: 데이터베이스 저장, 추가 처리 등
            Console.WriteLine(line);
        }
    }
}
// 대용량 데이터 처리를 위한 JavaScript 클라이언트 코드

class LargeDataService {
    // 방법 1: 스트림 리더를 사용한 점진적 데이터 처리
    static async fetchLargeDataStream(url) {
        try {
            // fetch API로 응답을 스트림으로 받음
            const response = await fetch(url, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                }
            });
            
            // 응답의 바디를 리더로 가져옴
            const reader = response.body.getReader();
            
            // 디코더 생성 (텍스트 디코딩을 위해)
            const decoder = new TextDecoder();
            
            // 누적 데이터를 저장할 배열
            const processedData = [];
            
            while (true) {
                // 스트림에서 데이터 청크 읽기
                const { done, value } = await reader.read();
                
                // 스트림 종료 시 반복 중단
                if (done) break;
                
                // 바이트 청크를 텍스트로 디코딩
                const decodedChunk = decoder.decode(value, { stream: true });
                
                // 디코딩된 청크 파싱 및 처리
                try {
                    const parsedChunk = JSON.parse(decodedChunk);
                    
                    // 데이터 배열에 추가
                    if (Array.isArray(parsedChunk)) {
                        processedData.push(...parsedChunk);
                    } else {
                        processedData.push(parsedChunk);
                    }
                    
                    // 중간 처리 로직 (선택적)
                    console.log('Processed chunk, current data size:', processedData.length);
                } catch (parseError) {
                    console.warn('Parsing chunk failed:', parseError);
                }
            }
            
            return processedData;
        } catch (error) {
            console.error('데이터 스트리밍 중 오류:', error);
            throw error;
        }
    }

    // 방법 2: 페이징된 데이터 처리
    static async fetchLargeDataPaged(url, pageSize = 10000) {
        let allData = [];
        let pageNumber = 0;
        let totalCount;

        while (true) {
            try {
                // 페이지별 데이터 요청
                const response = await fetch(`${url}?pageSize=${pageSize}&pageNumber=${pageNumber}`, {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json'