카테고리 없음
large-scale data streaming.
by keisoft
2024. 12. 9.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;
// 데이터 모델 정의
public class TestData
{
public string Key { get; set; }
public DataClass Data { get; set; }
}
public class DataClass
{
public int ChartKey { get; set; }
public List<ChartData> ChartDataList { get; set; }
}
public class ChartData
{
public float X { get; set; }
public float Y { get; set; }
}
// API 컨트롤러
[ApiController]
[Route("api/[controller]")]
public class DataController : ControllerBase
{
// 방법 1: 대용량 데이터 스트리밍 (IAsyncEnumerable)
[HttpPost("stream-large-data")]
public async IAsyncEnumerable<ChartData> StreamLargeData()
{
// 200만건의 대용량 데이터 생성 예시
for (int i = 0; i < 2_000_000; i++)
{
yield return new ChartData
{
X = i,
Y = (float)Math.Sin(i * 0.01)
};
// 대용량 데이터 생성 중 잠시 대기 (선택적)
await Task.Delay(1);
}
}
// 방법 2: 청크 단위 데이터 전송
[HttpPost("chunk-large-data")]
public async Task<IActionResult> ChunkedDataTransfer()
{
// 응답 스트림 설정
Response.Headers.Add("Content-Type", "application/json");
Response.Headers.Add("Transfer-Encoding", "chunked");
// JSON 직렬화 옵션 설정
var options = new JsonSerializerOptions
{
WriteIndented = false,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
// 스트림 writer 준비
await using var writer = new StreamWriter(Response.Body);
await writer.WriteAsync("[");
// 200만건 데이터 청크 단위로 전송
bool isFirstItem = true;
for (int i = 0; i < 2_000_000; i++)
{
var chartData = new ChartData
{
X = i,
Y = (float)Math.Sin(i * 0.01)
};
// 첫 번째 아이템이 아니면 콤마 추가
if (!isFirstItem)
{
await writer.WriteAsync(",");
}
isFirstItem = false;
// JSON으로 직렬화하여 스트림에 직접 쓰기
var jsonData = JsonSerializer.Serialize(chartData, options);
await writer.WriteAsync(jsonData);
await writer.FlushAsync();
}
await writer.WriteAsync("]");
await writer.FlushAsync();
return new EmptyResult();
}
// 방법 3: 페이징된 데이터 전송
[HttpPost("paged-large-data")]
public IActionResult PagedDataTransfer(
[FromQuery] int pageSize = 10000,
[FromQuery] int pageNumber = 0)
{
// 전체 데이터 생성
var allData = GenerateLargeDataSet();
// 페이징 처리
var pagedData = allData
.Skip(pageNumber * pageSize)
.Take(pageSize)
.ToList();
return Ok(new
{
TotalCount = allData.Count,
PageSize = pageSize,
PageNumber = pageNumber,
Data = pagedData
});
}
// 대용량 데이터셋 생성 헬퍼 메서드
private List<ChartData> GenerateLargeDataSet()
{
return Enumerable.Range(0, 2_000_000)
.Select(i => new ChartData
{
X = i,
Y = (float)Math.Sin(i * 0.01)
})
.ToList();
}
}
// 클라이언트에서 사용할 수 있는 데이터 처리 예시 서비스
public class LargeDataService
{
private readonly HttpClient _httpClient;
public LargeDataService(HttpClient httpClient)
{
_httpClient = httpClient;
}
// 스트리밍 데이터 처리 메서드
public async Task ProcessStreamingData()
{
var response = await _httpClient.PostAsync("/api/data/stream-large-data", null);
response.EnsureSuccessStatusCode();
// 스트림 처리 로직 구현
await using var stream = await response.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream);
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
// 각 라인(데이터) 처리
// 예: 데이터베이스 저장, 추가 처리 등
Console.WriteLine(line);
}
}
}
// 대용량 데이터 처리를 위한 JavaScript 클라이언트 코드
class LargeDataService {
// 방법 1: 스트림 리더를 사용한 점진적 데이터 처리
static async fetchLargeDataStream(url) {
try {
// fetch API로 응답을 스트림으로 받음
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
});
// 응답의 바디를 리더로 가져옴
const reader = response.body.getReader();
// 디코더 생성 (텍스트 디코딩을 위해)
const decoder = new TextDecoder();
// 누적 데이터를 저장할 배열
const processedData = [];
while (true) {
// 스트림에서 데이터 청크 읽기
const { done, value } = await reader.read();
// 스트림 종료 시 반복 중단
if (done) break;
// 바이트 청크를 텍스트로 디코딩
const decodedChunk = decoder.decode(value, { stream: true });
// 디코딩된 청크 파싱 및 처리
try {
const parsedChunk = JSON.parse(decodedChunk);
// 데이터 배열에 추가
if (Array.isArray(parsedChunk)) {
processedData.push(...parsedChunk);
} else {
processedData.push(parsedChunk);
}
// 중간 처리 로직 (선택적)
console.log('Processed chunk, current data size:', processedData.length);
} catch (parseError) {
console.warn('Parsing chunk failed:', parseError);
}
}
return processedData;
} catch (error) {
console.error('데이터 스트리밍 중 오류:', error);
throw error;
}
}
// 방법 2: 페이징된 데이터 처리
static async fetchLargeDataPaged(url, pageSize = 10000) {
let allData = [];
let pageNumber = 0;
let totalCount;
while (true) {
try {
// 페이지별 데이터 요청
const response = await fetch(`${url}?pageSize=${pageSize}&pageNumber=${pageNumber}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'