😎 이 글에서 다룰 것
성능적 측면에서 고려했던 DB 접근 기술인 ItemReader 선택 고려사항과 Bulk 처리 Write에서의 고려사항
ItemReader 선택
앞서 언급한 내용처럼 배치작업은 결국 일괄적으로 많은 데이터를 한번에 모아서 처리하는 작업이다.
즉, 단순한게 표현하자면 요구사항에 맞춰서 데이터를 읽고 처리해주는 것이 핵심인 것이다.
이를 다르게 생각하면 실시간 처리가 어려운 대용량의 데이터를 다루기 위한 기술으로도 볼 수 있다.
따라서 자연스럽게 데이터 처리와 관련하여 효율성을 높이는 것에 최대한 초점을 맞춰 개발을 진행하였고 과정에서 고려했던 내용들을 정리해보겠다.
먼저 데이터를 읽어오는 ItemReader 부분이다.
ItemReader ?
영어 단어가 의미하는 뜻이 직관적으로 다가오듯, 스프링 배치에서 다양한 입력으로부터 데이터를 읽도록 제공하는 인터페이스다.
스프링 배치는 csv, txt, XML, Json, DB, 메세지 큐 ... 등등 다양한 입력으로부터 데이터를 읽을 수 있도록 구현체를 제공하기에 데이터를 읽음에 필요한 구현체를 사용하면 된다.

위와 같이 많은 구현체들 중에서 DB Reader로 CursorItemReader와 PagingItemReader를 제공한다.
그리고 DB 접근 기술로는 JDBC와 JPA 방식이 있다. 따라서 총 4개 JdbcCursorItemReader, JdbcPagingItemReader, JPACursorItemReader, JPAPagingItemReader를 고려할 수 있었다.
결론적으로 말하자면, 해당 프로젝트에서는 JdbcCursorItemReader를 사용하였다.
CursorItemReader와 PagingItemReader
그 이유를 살펴보자면, 먼저 Cursor방식과 Paging 방식에 대한 고려가 있었다.

CursorItemReader
cursor기반의 ItemReader는 JDBC ResultSet의 기본 기능으로 ResultSet의 open과 더불어 next() 메소드를 호출하고 DB의 데이터가 반환되는 원리이다.
즉, 한번의 쿼리로 커넥션을 맺고 데이터를 Streaming으로 읽어오기에 고성능 배치 처리에 적합하다.
게다가 FetchSize를 설정하면, 단건 데이터를 Streaming하는 것이 아닌 정해진 데이터 크기에 따른 IO를 발생시키기에 뒤에 이어서 말할 Paging 방식과 비교하여 IO 횟수로 인한 성능적 저하는 없다고 고려하였다.
다만, 커넥션이 계속 유지되며 데이터를 처리하기에 DB와의 TimeOut을 여유있게 설정해줘야 커넥션 끊임으로 발생하는 배치 작업 중단이 발생하지 않는다.
아래는 프로젝트에서 사용한 ItemReader의 일부분이다.
@Bean public ItemReader<Contract> jdbcContractItemReader() { String sql = """ select con.contract_id, con.invoice_type_id, con.payment_type_id, con.contract_cycle, con.item_price, con.item_amount, con.is_deleted, con.is_subscription, con.payment_due_cycle from contract con where con.contract_status_id = 2 and con.is_deleted = false """; return new JdbcCursorItemReaderBuilder<Contract>() .name("jdbcContractItemReader") .fetchSize(CHUNK_SIZE) .sql(sql) .rowMapper(new JdbcContractRowMapper()) .dataSource(dataSource) .build(); }
PagingItemReader
Paging 방식의 ItemReader는 위의 Cursor 방식과 달리 필요한 데이터에 대해 쿼리를 반복적으로 실행하여 각 쿼리의 결과에 따른 데이터를 읽어오는 방식이다.
흔히 게시판 구현에 있어서, 페이징의 offset, limit 방식으로 데이터를 읽어오는 것과 같으며,
Batch에서는 offset과 Limit를 지정해준 PageSize에 맞춰서 자동으로 생성하서 반환값을 전달한다.
그리고 필요한 데이터를 읽어오는 과정에서(쿼리문을 날리는) 커넥션을 각각 새롭게 맺는 방식이다.
따라서 Cursor 방식에 비해서 배치 작업이 길어질 경우에 TimeOut에 대한 부담이 매우 낮아진다.
하지만, IO가 발생하는 경우마다 커넥션을 맺고 끊는 방식으로 동작하기에 오버헤드가 Cursor 방식에 비하면 크고 성능적 이점은 적어진다. 게다가 데이터 크기가 커지면 커질수록 offset 설정값 이전의 데이터를 다 읽고 지정된 offset 값에 도달해서야 limit만큼 데이터를 읽어오는 방식이기에 성능이 매우 떨어진다.
물론 zero-offset 구현을 통한 대책도 있기도 하다.( 더 자세한 내용은 구글링을 통해 살펴보자 )
아래는 위의 CursorItemReader와 같은 기능을 하는 JpaPagingItemReader의 코드이다.
@Bean public JpaPagingItemReader<Contract> contractItemReader() { JpaPagingItemReader<Contract> reader = new JpaPagingItemReader<>(); reader.setEntityManagerFactory(entityManagerFactory); reader.setQueryString("SELECT c FROM Contract c WHERE c.contractStatus.name = '진행' ORDER BY c.id ASC"); reader.setPageSize(CHUNK_SIZE); return reader; }
CursorItemReader vs PagingItemReader
🤜 두 ItemReader가 작동하는 방식을 나름 정리하자면, PagingItemReader의 경우 offset, limit 기반으로 원하는 데이터를 읽어오는 방식이기에 정해진 범주마다 쿼리를 발생시킨다.
이는 DB와 커넥션을 맺고 끊는 반복적인 작업이 동반되기 때문에 CursorItemReader에 비하면 성능적 이점이 낮다.
(Zero-offset을 사용하면 성능적 향상을 도모할 수 있긴하다.)
다만, 운영의 안정적 측면을 고려해보면 각 데이터를 가져오는 절차마다 커넥션을 맺고 끊기에 특정 데이터에 대한 질의에서 에러가 발생하여도 다른 질의에는 문제가 없다. 그리고 socketTimeOut의 설정에 대한 부담이 적어 안전성 측면에서 우수하다.
추가적으로 Paging 방식을 사용하여 데이터를 읽어온다면, 중복되지 않는 값을 기준으로 정렬 조건이 반드시 포함되어야 한다.
각각 별도의 쿼리이기에 정렬기준이 없다면 쿼리마다 본인들만의 정렬기준을 만들어 실행하게 되기 때문이다.
🤜 한편, CursorItemReader의 경우 한번의 커넥션을 맺고나서 Streaming 형식으로 데이터를 불러와 처리한다. 때문에 앞서 Paging 방식에 비해 분명 커넥션으로 발생하는 비용이 현저히 적으며 성능적인 이점을 얻을 수 있다.
하지만, 이러한 메커니즘은 안정성 측면에서는 좋지 않은데 한번 맺은 커넥션에서 한번의 에러 발생이 곧 작업 전체에 치명적일 수 있기 때문이다. 또한 socketTiemOut을 넉넉하게 설명해줘야 작업에서 연결이 끊기는 현상을 예방할 수 있기에 만약 전체 작업 시간이 오래 걸리는 경우라면 CursorItemReader는 안정성 측면에서 관리하기 어렵고 네트워크 비용이나 리소스 관리도 고려해야한다.
또한 Thread Safe하지 않기에 멀티스레딩과 같은 기술을 사용하려고 한다면 적절하지 않다.
(SynchronizedItemStreamReader로 감싸서 사용하는 방법이 있긴하다. )
위와 같은 정리와 프로젝트에서 사용할 기술로 앞서 언급한 것처럼 JdbcCursorItemReader를 선택하였다.
그 이유는 다음과 같았다.
→ 배치 서버를 따로 띄우고 프로젝트를 진행하였으며, 프로젝트 시나리오에서 배치 작업이 실행되는 시간 동안은 잠시 다른 서버를 막기에 이에 대한 부담은 적다고 판단하였다.
→ 기간 내에 구현 완성이 필요하니 따로 커스텀 구현체를 만드는 일이 없는 것이 좋겠다.
→ CursorItemReader가 성능적 이점이 높기에 빠른 작업처리가 가능하다.
→ 배치 작업을 통해 처리할 데이터의 숫자는 적기에 CursorItemReader를 사용한다고 무리되지 않을 정도라고 판단하였다.
→ JpaCursorItemReader는 데이터를 모두 메모리에 올리고 사용하는 방식으로 OutOfMemory를 유발시키기에 그대로 사용하기에 적합하지 않다.
→ PagingItemReader의 경우에도 배치 특성상 대용량의 데이터를 처리하기 위한 기술인데, 성능을 높이기 위해선 커스텀 구현체가 필요하다고 판단하여 제외
위와 같은 기준으로 CursorItemReader를 선택하였다.
ItemWriter 선택
배치 작업을 간단하게 표현하자면, 결국 일괄된 데이터를 읽고 쓰는 것이기에 앞서 데이터를 읽는 과정과 더불어
' 어떻게 데이터를 출력할 것이냐 ' 도 중요한 부분이다.
해당 단계에서는 앞서 Reader로 읽어온 크기만큼의 데이터를 처리 후에 출력하는 부분으로,
대량의 데이터를 삽입하는 경우 Bulk Insert(Batch Insert)를 수행하는 것이 성능에 있어 좋은 접근이라고 판단하였다.
그리고 크게 고려할 부분은 DB접근 기술과 관련하여 JDBC 와 JPA 선택지가 있었다.
이를 선정하기 위해서 다양한 자료를 조사하니 현재 프로젝트 케이스에서는 " Writer 부분으로 DB에 접근하는 기술에 JPA는 적합하지 않다. "는 기준이 생겼다.
그 이유는 다음과 같았다.
현재 사용하는 DB는 MySQL인데 PK 전략을 IDENTITY로 사용하고 있었다.
이는 auto_increment로 증가한 pk값이 DB 자체에서 생성한 값을 사용하는 것이다.
따라서 데이터 insert 시에 저장할 pk값은 DB에게 위임하는 것이기에 여러 건의 데이터를 생성에 있어
하이버네이트는 JDBC 수준의 Batch Insert를 지원하지 않는다.
( JPA의 영속성 컨텍스트의 작동 방식을 고려해보면, 지원할 수 없는 것이 맞는 것 같다. )
또한, Batch 환경은 읽고 쓰는 부분이 명확한데, 데이터 삽입 과정에서 영속성 컨텍스트나 Dirty Checking과 같은 기능이 필요한 이유가 없다고 판단되었다.
따라서, Writer 부분은 JdbcTemplate을 사용하였고, 대량의 데이터 삽입이 필요한 로직에는 JdbcTemplate.batchUpdate()를 사용하였다.
아래는 프로젝트에서 사용한 코드의 일부이다.
먼저, JdbcTemplate를 사용해서 Batch Insert를 사용한 경우
private void bulkInsertInvoices(List<Invoice> invoices) {
String sql = "insert into invoice (contract_id, invoice_type_id, payment_type_id, payment_status_id, charge_amount, contract_date, due_date, is_deleted, created_at, updated_at)" +
" values (?, ?, ?, ?, ?, ?, ?, false, NOW(), NOW())";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Invoice invoice = invoices.get(i);
ps.setLong(1, invoice.getContract().getId());
ps.setLong(2, invoice.getInvoiceType().getId());
ps.setLong(3, invoice.getPaymentType().getId());
ps.setLong(4, invoice.getPaymentStatus().getId());
ps.setLong(5, invoice.getChargeAmount());
ps.setTimestamp(6, java.sql.Timestamp.valueOf(invoice.getContractDate()));
ps.setTimestamp(7, java.sql.Timestamp.valueOf(invoice.getDueDate()));
}
@Override
public int getBatchSize() {
return invoices.size();
}
});
}
그리고 JPA를 사용해서 save() 메서드를 사용한 경우
@Override
public void write(Chunk<? extends Contract> chunk) throws Exception {
...
for (Contract contract : chunk) {
boolean exists = invoiceRepository.existsByContractAndMonthAndYear(contract, nextMonthValue, yearValue);
...
Invoice invoice = Invoice.builder()
.contract(contract)
.invoiceType(contract.getInvoiceType())
.paymentType(contract.getPaymentType())
.paymentStatus(paymentStatusUnpaid)
.chargeAmount(contract.getItemPrice() * contract.getItemAmount())
.contractDate(setContractDate.atStartOfDay())
.isDeleted(false)
.dueDate(dueDate)
.build();
invoiceRepository.save(invoice);
}
}
}
그리고 실제로 사용한 기술이 성능적 이점을 가지는 지 확인하기 위해서
JobExecutionListenerSupport와 StepExecutionListenerSupport를 사용해서 각 Job과 Step에서 수행하는 시간을 측정하기 위한 코드를 작성하였다.
JobExecutionListenerSupport의 일부 코드이다.
@Component
@Slf4j
public class JobCompletionCheckListener extends JobExecutionListenerSupport {
private final ObjectMapper objectMapper = new ObjectMapper();
private LocalDateTime startTime;
private LocalDateTime endTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = LocalDateTime.now();
log.info("Job 시작:{}", jobExecution.getJobInstance().getJobName());
log.info("Job 시작 시간 : {}", startTime);
}
@Override
public void afterJob(JobExecution jobExecution) {
endTime = LocalDateTime.now();
log.info("Job 종료:{}", jobExecution.getJobInstance().getJobName());
log.info("종료 시간: {}", endTime);
Duration duration = Duration.between(startTime, endTime);
long minutes = duration.toMinutes();
long seconds = duration.minusMinutes(minutes).getSeconds();
log.info("지속 시간: {}분 {}초", minutes, seconds);
BatchStatus jobStatus = jobExecution.getStatus();
ExitStatus jobExitStatus = jobExecution.getExitStatus();
log.info("Job 상태: {}, 결과: {}", jobStatus, jobExitStatus);
if (jobStatus == BatchStatus.FAILED) {
log.error("Job 실패: {}", jobExecution.getJobInstance().getJobName());
jobExecution.getAllFailureExceptions().forEach(exception -> {
log.error("Job 예외 발생: {}", exception.getMessage(), exception);
});
}
long totalReadCount = jobExecution.getStepExecutions()
.stream()
.mapToLong(StepExecution::getReadCount)
.sum();
long totalWriteCount = jobExecution.getStepExecutions()
.stream()
.mapToLong(StepExecution::getWriteCount)
.sum();
long totalSkipCount = jobExecution.getStepExecutions()
.stream()
.mapToLong(StepExecution::getSkipCount)
.sum();
long totalFailureCount = jobExecution.getStepExecutions()
.stream()
.filter(stepExecution -> stepExecution.getStatus() == BatchStatus.FAILED)
.count();
log.info("Job 처리 결과 - 총 읽기 건수: {}, 총 성공 건수: {}, 총 스킵 건수: {}, 총 실패 Step 수: {}",
totalReadCount, totalWriteCount, totalSkipCount, totalFailureCount);
...
}
}
StepExecutionListenerSupport의 일부 코드이다.
@Component
@Slf4j
public class StepCompletionCheckListener extends StepExecutionListenerSupport {
private final ObjectMapper objectMapper = new ObjectMapper();
private LocalDateTime stepStartTime;
private LocalDateTime stepEndTime;
@Override
public void beforeStep(StepExecution stepExecution) {
stepStartTime = LocalDateTime.now();
log.info("Step 시작: {}", stepExecution.getStepName());
log.info("Step 시작 시간 : {}", stepStartTime);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
stepEndTime = LocalDateTime.now();
log.info("Step 종료: {}", stepExecution.getStepName());
log.info("Step 종료 시간: {}", stepEndTime);
Duration duration = Duration.between(stepStartTime, stepEndTime);
long minutes = duration.toMinutes();
long seconds = duration.minusMinutes(minutes).getSeconds();
log.info("Step 지속 시간: {}분 {}초", minutes, seconds);
// Step 상태 정보
ExitStatus exitStatus = stepExecution.getExitStatus();
BatchStatus batchStatus = stepExecution.getStatus();
log.info("Step 종료: {}", stepExecution.getStepName());
// 각 Step에서의 처리 결과
long totalReadCount = stepExecution.getReadCount();
long totalWriteCount = stepExecution.getWriteCount();
long totalSkipCount = stepExecution.getSkipCount();
long commitCount = stepExecution.getCommitCount();
log.info("Step 처리 결과 - 상태: {}, 결과: {}, 총 읽기 건수: {}, 총 쓰기 건수: {}, 커밋 건수: {}, 스킵 건수: {}",
batchStatus, exitStatus, totalReadCount, totalWriteCount, commitCount, totalSkipCount);
...
}
}
이를 통해 같은 로직을 수행하는 JdbcCursorItemReader + BatchInsert() 조합과 JpaPageItemReader + save()의 경우의 성능을 비교해봤다.


측정결과

위의 그래프와 같이 데이터 숫자가 많아지면 많아질수록 성능 차이가 두드러졌으며 10,000건 기준으로는 성능 차이가 약 57배까지 보였다.
이와 같은 과정을 통해, 해당 프로젝트에서 배치 작업을 수행함에 필요한 ' 읽고 쓰는 ' 과정에서의 사용할 구현체를 선택하였다.
다음 글에서는 배치 작업에서의 오류 제어 로직과 모니터링에 대해 고민하고 구현한 내용을 다뤄볼 예정이다.
참고한 내용
스프링 배치 강의 | 정수원 - 인프런
정수원 | 초급에서 중~고급에 이르기까지 스프링 배치의 기본 개념부터 API 사용법과 내부 아키텍처 구조를 심도있게 다룹니다. 그리고 스프링 배치 각 기능의 흐름과 원리를 학습하게 되고 이를
www.inflearn.com
https://jaeseo0519.tistory.com/400#--%--Additional%--Improvement
https://jaeseo0519.tistory.com/397
[Spring Boot] 정기 푸시 알림(Push Notification) 전송 배치(Batch) 프로세스
💡 문제가 되는 부분이 많고, Batch에 대해 미숙한 이해를 기반으로 작성한 글이므로 참고로만 읽어주세요.📕 목차1. Introduction2. Domain3. Infra4. Batch5. Discussion Topics1. Introduction 📌 Usecase사용자는
jaeseo0519.tistory.com
https://jojoldu.tistory.com/336
7. Spring Batch 가이드 - ItemReader
앞의 과정들을 통해 Spring Batch가 Chunk 지향 처리를 하고 있으며 이를 Job과 Step으로 구성되어 있음을 배웠습니다. Step은 Tasklet 단위로 처리되고, Tasklet 중에서 ChunkOrientedTasklet을 통해 Chunk를 처리하
jojoldu.tistory.com
https://tech.kakaopay.com/post/ifkakao2022-batch-performance-read/
[if kakao 2022] Batch Performance를 고려한 최선의 Reader | 카카오페이 기술 블로그
if(kakao)2022 대량의 데이터를 Batch로 읽을 때의 노하우를 공유합니다.
tech.kakaopay.com
'Dev Log > Batch개발(청구-수납)' 카테고리의 다른 글
| Spring Batch 서버 개발(3) - 예외 제어와 모니터링 (1) | 2024.11.11 |
|---|---|
| Spring Batch 서버 개발(1) (0) | 2024.11.01 |
댓글