Empezando con Spring Batch 5
Spring Boot | Juan Renato Noh | 20 octubre 2023
Este artículo está diseñado para aprender los fundamentos de Spring Batch y construir una aplicación utilizando la versión 5 la cual es la actual al momento de publicar esta información.
A lo largo de la lectura se incluyeron ejercicios prácticos que buscan reafirmar los conocimientos de la teoría explicada , de igual manera al final al finalizar cada práctica comparto la ubicación del repositorio con el ejercicio completo.
Espero que este contenido le sea de su agrado .
Introducción
Muchas aplicaciones requieren realizar la consulta y procesamiento de información en un espacio determinado de tiempo y en grandes volúmenes de información, en estas situaciones los arquitectos de software optan por la implementación de los procesos batch.
Algunos casos de uso comunes en la industria son la generación de reportes de ventas , pólizas contables, carga de archivos , y otros más .Para estos escenarios los desarrolladores Java tenemos la posibilidad de utilizar Spring Batch, un framework open source , ligero el cual cuenta con funcionalidades que permiten desarrollar un proceso por lotes con el uso de funciones reutilizables para el procesamiento y lectura de grandes volúmenes de datos y que a su vez son compatibles con las bondades que provee el ecosistema de Spring boot .
Spring Batch es una implementación que se liberó por Spring Source (VMware) en conjunto con Accenture , quien es una empresa que cuenta con una gran experiencia implementando arquitecturas batch con varios de sus clientes.
El objetivo principal que busca Spring Batch es que el desarrollador se enfoque en la lógica de negocio y dejar a la infraestructura que cubra los demás aspectos necesarios en un proceso por lotes.
Spring Batch nos ayuda con :
Lectura de grandes volúmenes de datos de una base de datos , archivo o cola .
Procesamiento y transformación de datos.
Escritura de datos en bloques .
Aplicación de una política de reintentos .
Procesamiento en paralelo.
Procesamiento en la nube .
Arquitectura Spring Batch
En general la arquitectura de Spring Batch cuenta con 3 principales componentes
Application : Este componente contiene los jobs y todo el código personalizado que se ha escrito por los desarrolladores con Spring Batch.
Batch Core : Contiene las clases necesarias para lanzar y controlar un Job (Job Launcher , Job , Step).
Batch Infrastructure : Contiene clases (ItemReader , ItemWriter ) y servicios (RetryTemplate) que son usados en común por ambas capas .
Práctica 1 - Configuración Spring Batch
Requisitos previos
Tener instalado jdk 17 ( Versión mínima requerida para Spring Batch 5).
Maven 3.9.4 o superior (En algunas versiones inferiores hay incompatibilidad con el jdk 17 ).
Conexion a Base de Datos MySQL.
Nota : Siguiendo la dinámica de Scrum decidimos plantear cada una de las prácticas con una historia de usuario . De igual forma al principio de cada práctica encontrará un listado de requisitos previos a tomar en cuenta para poder concluir cada tutorial .
Para configurar el proyecto se puede realizar desde cualquier IDE, el Spring Boot CLI o si prefiere el spring initializr.
Los valores importantes a considerar es:
Incluir la dependencia Spring Batch
Incluir la dependencia con el conector de MySql
Seleccionar una versión de spring boot que cuente con la versión de Spring Batch 5 . Al momento de ser escrito el artículo la versión actual (3.1.4) cumplía este requerimiento .
Si se ha escogido como gestor de dependencias maven el pom se observa así :
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jrnoh</groupId>
<artifactId>empezando-spring-batch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>empezando-spring-batch</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Ahora con estas simples configuraciones ya podemos crear nuestro primer batch, para esto agregaremos la siguiente clase :
package com.jrnoh.demo.em.sb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
//Indica al contedor de beans de spring que la gestione
@Configuration
public class DirectionsSincronitationJob {
Logger logger = LoggerFactory.getLogger(this.getClass());
//Job que representa el batch en el contedor
@Bean
public Job sincronationDirection(JobRepository jobRepository,TaskletStep initStep) {
return new JobBuilder("directionsSincronitationJob", jobRepository)
.start(initStep) //Tarea que realizara el job
.build();
}
//Accion que realizara el job , en este caso se puso que unicamente imprima un mensaje en consola
@Bean
public TaskletStep initStep(JobRepository jobRepository, PlatformTransactionManager p) {
return new StepBuilder("initStep", jobRepository)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
logger.info("******************** Iniciando Batch ***********");
return RepeatStatus.FINISHED;
}
}, p)
.build();
}
}
Y tambien las siguientes propiedades en el application.properties
#Conexion a la base de datos
spring.datasource.url = jdbc:mysql://localhost:3306/empezando-spring-batch?createDatabaseIfNotExist=true
spring.datasource.username = root
spring.datasource.password = password
spring.jpa.hibernate.ddl-auto = update
# Crear el esquema de bd spring batch al arrancar el proyecto
spring.batch.jdbc.initialize-schema=ALWAYS
Ahora ya podemos ejecutar la aplicación (Si es de su preferencia la terminal ejecute mvn spring-boot:run ) .
Otra cosa interesante es que al revisar nuestra base de datos podemos observar un esquema de base de datos creado con varias tablas las cuales han sido creadas por Spring Batch . En el transcurso de este artículo iremos hablando acerca del uso de algunas de ellas.
El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente :
https://github.com/juanrenatonoh/empezando-con-spring-batch/tree/main/1-configuracion-spring-batch
JSR-352 y Jakarta Batch
El Java Specification Request 352 es la especificación que la comunidad ha definido para los desarrolladores de aplicaciones batch . En ella se encuentran las directrices de cómo implementar esta tecnología en entornos Java EE y Java SE .
Actualmente se encuentran publicadas en la página de Java Community Process 2 implementaciones : JBeret y Spring Batch.
Sin embargo por la creación del proyecto Jakarta EE se han renombrado paquetes “javax” a “jakarta” y ha surgido la aparición de una nueva especificación (Jakarta Batch ) provocando que JBeret base su implementación en este nuevo estándar .
Para el caso de Spring Batch en su versión 5 ha decidido discontinuar el uso del JSR-352 de acuerdo a la documentación oficial y todavía no añadir la especificación Jakarta Batch ya que el número de consumidores por esta vía es pequeño .
Por ahora lo más obvio es que para las versiones e implementaciones recientes de java mejor consultemos a Jakarta Batch
Si desea comparar ambas especificaciones los enlaces son :
Domain Language Batch
Spring batch ha basado sus conceptos en la especificación Java Batch para proveer de una arquitectura , componentes y servicios que resulten familiares a sus usuarios . El siguiente diagrama muestra los conceptos principales conocidos como el “Domain Language Batch”.
Los conceptos principales los podemos resumir de la siguiente manera :
El JobLauncher es el encargado de ejecutar los Job en base a un listado de parámetros (JobParameter).
Un Job puede tener de uno a múltiples Steps ,en otras palabras es un contenedor de Steps.
Un Step es un objeto de dominio secuencial contenido en un Job puede estar conformado por solo un ItemReader , ItemProcesor e ItemWriter.
El JobRepository es el mecanismo de persistencia que permite almacenar el scope del proceso batch. Su uso es para facilitar el reinicio del Job .
¿ Que es un Job ?
Un Job es una entidad que representa un proceso batch es decir dentro de él está contenido el dominio del negocio y son ejecutados mediante una secuencia de pasos . Las características de un job son :
Posee un nombre.
Tiene Step 's definidos y ordenados .
Puede o no ser reiniciable.
En la imagen “Jerarquía de un Job” se presenta un proceso el cual es de ejecución diaria por ejemplo el 1 de Enero se ejecuta por primera vez , el 2 de Enero se vuelve ejecutar, y así sucesivamente . A cada una de estas ejecución de forma lógica se les conoce como Job Instance.
Lo que distingue un JobInstance de otro son los valores parametrizados , en este caso sería la fecha (1 Enero , 2 Enero , etc ) . Los parámetros en Spring batch son conocidos como Job Parameter y su función es mantener un conjunto de valores que son proporcionados al iniciar un Job.
Ahora bien ¿Es posible que un Job Instance sea ejecutado múltiples veces ? La respuesta es SÍ ,ya que un JobInstance no se considera completo a menos que la ejecución ( completa ) sea exitosa , por lo que previamente pudieron haber ocurrido múltiples intentos fallidos . Cada uno de estos intentos se le denomina JobExecution .
Al ejecutar un batch en el esquema de Spring Batch existen las respectivas tablas para almacenar cada uno de los metadatos de Job , Job Instance , Job Parameter y Job Instance.
Práctica 2 - Configuración y Ejecución del Job
Requisitos previos
Haber realizado la práctica “Configuración Spring Batch” .
De acuerdo al requerimiento se necesita un Api Rest por lo que ahora incluiremos la dependencia, Spring Web al pom de nuestro proyecto.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Ahora crearemos nuestra clase JobRest con nuestros servicios de ejecutar y consultar .
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/***
*
* @author jnoh
*
*/
@RestController
@RequestMapping("/job")
public class JobRest {
/***
* Servicio que me permite realizar el disparo de un job
*/
@Autowired
JobLauncher jobLauncher;
/***
* Inyeccion de mi job
*/
@Autowired
Job job;
/**
* Servicio para consultar el JobRepository y todas las entidades del modelo
* funcionalidades ahi
*/
@Autowired
JobExplorer jobExplorer;
/**
* Lanza un job
*
*
curl --location --request POST 'http://localhost:8080/job/ejecutar' \
--header 'Content-Type: application/json' \
--data-raw '{
"fecha" : "2023-10-16"
}
'
*
*
* @throws JobExecutionAlreadyRunningException
* @throws JobRestartException
* @throws JobInstanceAlreadyCompleteException
* @throws JobParametersInvalidException
* @throws ParseException
*/
@PostMapping("/ejecutar")
public ResponseEntity<HashMap<String, Object>> ejecutar(@RequestBody HashMap<String,Object> request)
{
var response = new HashMap<String,Object>();
try {
//Obtencion de los parametros y creacion de JobParametros por builder
var fecha = new SimpleDateFormat("yyyy-MM-dd").parse(request.get("fecha").toString());
var jobParametersBuilder = new JobParametersBuilder()
.addDate("fecha", fecha);
//Ejecucion Job por medio de job launcher
var jobExecution = jobLauncher.run(job, jobParametersBuilder.toJobParameters());
response.put("jobId", jobExecution.getId());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e )
{
// Error en caso ya exista recordar los conceptos de JobInstance , JobParameter y JobExecution
response.put("errorMessage :", e.getMessage());
}catch (Exception e) {
response.put("errorMessage :", "Ocurrio un error no controlado");
}
return ResponseEntity.ok(response);
}
/***
* Consulta por modelo
curl --location 'http://localhost:8080/job/job-execution/poneridaca' \
* @return
*/
@GetMapping("/job-execution/{jobExecutionId}")
public ResponseEntity<String> obtenerJobsInstances(@PathVariable Long jobExecutionId){
//Se consulta por medio del servicio que provee Spring Batch , se usa la interfaz Job Explorer cuando se crea el job tenemos un JobInstance.Id
var jobExecution = jobExplorer.getJobExecution(jobExecutionId);
return ResponseEntity.ok(jobExecution == null ? "" : jobExecution.toString());
}
}
Y la última modificación en el código es incluir la anotación @EnableBatchProcesing en la configuración del Job .
//Indica al contedor de beans de spring que la gestione
@Configuration
@EnableBatchProcessing
public class DirectionsSincronitationJob {
¡ Ahora es tiempo de ejecutar y probar los servicios !
Lo primero que observamos ahora es que al levantar la aplicación el Job no se ejecutó de forma automática , esto es porque a partir de la versión 5 en Spring Batch al utilizar @EnableBatchProcesing o DefaultBatchConfigurer es necesario lanzar un Job por medio del JobLauncher .
Para lanzar el job realicemos la siguiente petición al API .
curl --location --request POST 'http://localhost:8080/job/ejecutar' \
--header 'Content-Type: application/json' \
--data-raw '{
"fecha" : "2023-10-16"
}'
Observamos que como resultado obtenemos un JobId de respuesta con el cual podemos consultar en el modelo la información de ejecución.
Para consultar la información de la ejecución del Job desde el api realicemos la siguiente petición utilizando el JobId obtenido en la creación .
curl 'http://localhost:8080/job/job-execution/2' \
--header 'Content-Type: application/json'
El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente :
https://github.com/juanrenatonoh/empezando-con-spring-batch/tree/main/2-configuracion-ejecucion-job
¿Que es un Step ?
Un Step es un objeto de dominio que encapsula una fase del dominio del Job y puede ser tan compleja o sencilla como se desee . Al igual que el Job por cada intento de ejecución es almacenado es almacenado en el contexto de ejecución y se les conoce como StepExecution.
Por lo general un Step está conformado por :
ItemReader : Abstracción que representa la obtención de datos.
ItemProcesor : Abstracción que representa el procesamiento y aplicación de reglas de negocio.
ItemWriter : Abstracción que representa la salida de datos.
Una de las ventajas que provee Spring Batch es que provee múltiples implementaciones de ItemReader e ItemWriter permitiendo la lectura y escritura en múltiples circunstancias que lo requiere el negocio.
Apendice ItemReader e ItemWriter
De igual manera Spring Batch trabaja bajo el concepto de “chunk-oriented” es decir se define el tamaño de chunk (fragmento de datos) con el cual primero se leerán y / o procesaran para que una vez este completo ItemWriter se encarga de escribir y confirmarlos .
Practica 3 - Configuración y Ejecución Step’s
Requisitos previos
Haber realizado la práctica “Configuración y Ejecución del Job ” .
De acuerdo al requerimiento ahora necesitaremos importan las direcciones del csv direction.csv , por lo que crearemos una carpeta llamada staging en la raíz del proyecto y ahí agregaremos el archivo despues de descargarlo .
Ahora incluiremos Spring Data JPA en el pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
¡ Es hora de modelar los datos de entrada y salida! . Primero crearemos la clase DirectionRecord que nos servirá para vaciar las filas del csv .
public record DirectionRecord ( String codigoPostal,String asentamiento,String tipoAsentamiento,String municipio ){}
Y ahora creamos la entidad Direction
@Entity
@Table(name = "Direction")
public class Direction {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String codigoPostal;
private String asentamiento;
private String tipoAsentamiento;
private String municipio;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getCodigoPostal() {
return codigoPostal;
}
public void setCodigoPostal(String codigoPostal) {
this.codigoPostal = codigoPostal;
}
public String getAsentamiento() {
return asentamiento;
}
public void setAsentamiento(String asentamiento) {
this.asentamiento = asentamiento;
}
public String getTipoAsentamiento() {
return tipoAsentamiento;
}
public void setTipoAsentamiento(String tipoAsentamiento) {
this.tipoAsentamiento = tipoAsentamiento;
}
public String getMunicipio() {
return municipio;
}
public void setMunicipio(String municipio) {
this.municipio = municipio;
}
}
Con estas clases ya es posible crear nuestro Step que se encargará de la importación de los registros . Para esto incluimos los siguientes beans en la clase DirectionsSincronitationJob.
/***
* Paso generado para la importacion de las direcciones
* @param jobRepository
* @param p
* @param fileReaderDirections
* @param procesorDirections
* @param writerDirections
* @return
*/
@Bean
public Step importDirections(JobRepository jobRepository,PlatformTransactionManager p
,FlatFileItemReader<DirectionRecord> fileReaderDirections
,ItemProcessor<DirectionRecord, Direction> procesorDirections
, RepositoryItemWriter<Direction> writerDirections) {
return new StepBuilder("importDirections",jobRepository)
.<DirectionRecord,Direction>chunk(10, p)//fragmentos de 10
.reader(fileReaderDirections)
.processor(procesorDirections)
.writer(writerDirections)
.build();
}
/***
* Lectura del archivo
* @return
*/
@Bean
public FlatFileItemReader<DirectionRecord> fileReaderDirections(){
return new FlatFileItemReaderBuilder<DirectionRecord>()
.name("fileReaderDirections")
.resource(new FileSystemResource("staging/directions.csv")) // ubicacion del archivo
.delimited()
.names("codigoPostal","asentamiento","tipoAsentamiento","municipio")
.targetType(DirectionRecord.class) // clase en la que se vacia el cisv
.build();
}
@Bean
public ItemProcessor<DirectionRecord, Direction> procesorDirections(){
//convierte el direction record a una direction (entidad)
return new ItemProcessor<DirectionRecord, Direction>() {
@Override
public Direction process(DirectionRecord d) throws Exception {
Direction direction = new Direction();
direction.setAsentamiento(d.asentamiento());
direction.setCodigoPostal(d.codigoPostal());
direction.setMunicipio(d.municipio());
direction.setTipoAsentamiento(d.tipoAsentamiento());
return direction;
}
};
}
/***
* Se encarga de persistir por entidades
* @param directionRepository
* @return
*/
@Bean
public RepositoryItemWriter<Direction> writerDirections(DirectionRepository directionRepository){
return new RepositoryItemWriterBuilder<Direction>()
.repository(directionRepository)
.build();
}
Por último agregamos el nuevo Step en el Job
//Job que representa el batch en el contedor
@Bean
public Job sincronationDirection(JobRepository jobRepository,TaskletStep initStep,Step importDirections) {
return new JobBuilder(NAME_JOB, jobRepository)
.start(initStep) //Tarea que realizara el job
.next(importDirections)//Nueva Tarea importacion
.build();
}
¡ Ahora es tiempo de ejecutar y probar los servicios ! (Recuerda que la ejecución del servicio lo vimos en la práctica 2)
El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente :
Referencias
https://docs.spring.io/spring-batch/docs/current/reference/html/index-single.html#itemReader
https://www.tutorialspoint.com/spring_batch/spring_batch_architecture.htm
Comentarios
Publicar un comentario