Empezando con Spring Batch 5


Spring Boot | Juan Renato Noh | 20 octubre 2023



Este artículo está diseñado para aprender los fundamentos de Spring Batch y  construir una aplicación utilizando la versión 5 la cual es la actual al momento de publicar esta información. 


A lo largo de la lectura se incluyeron ejercicios  prácticos que  buscan reafirmar los conocimientos de la teoría explicada , de igual manera al final al finalizar cada práctica comparto la ubicación del repositorio con el ejercicio completo. 


Espero que este contenido le sea de su agrado .  

Introducción 


Muchas aplicaciones requieren realizar la consulta y  procesamiento de información en un  espacio determinado de tiempo y en grandes volúmenes de información, en estas situaciones los arquitectos de software optan por la implementación de los procesos batch.


Algunos casos  de uso comunes en la industria son la generación de reportes de ventas , pólizas contables, carga de archivos , y  otros más .Para estos escenarios los desarrolladores Java tenemos la posibilidad de utilizar  Spring Batch, un framework open source , ligero  el cual cuenta con funcionalidades que permiten desarrollar un proceso por lotes con el uso de funciones reutilizables para el procesamiento y lectura de grandes volúmenes de datos  y que a su vez son compatibles con las bondades que provee el ecosistema de Spring boot .


Spring Batch es una implementación que se liberó por  Spring Source (VMware)  en conjunto con  Accenture , quien es una empresa que  cuenta con  una gran experiencia implementando arquitecturas batch con varios de sus clientes. 




Imagen 1 - Logo Spring Batch . 


El  objetivo principal que busca Spring Batch es que el desarrollador se enfoque en la lógica de negocio  y dejar a la infraestructura que cubra los demás aspectos necesarios en un proceso por lotes. 


Spring Batch nos ayuda con : 


  • Lectura de grandes volúmenes de datos de una base de datos , archivo o cola . 

  • Procesamiento y transformación de datos. 

  • Escritura de datos en bloques .

  • Aplicación de una  política de reintentos . 

  • Procesamiento en paralelo. 

  • Procesamiento en la nube .


Arquitectura Spring Batch

Imagen 2 - Arquitectura de Spring Batch. 


En general la arquitectura de Spring Batch cuenta con 3 principales componentes 


  • Application : Este componente contiene los jobs y todo  el código personalizado que se ha escrito por los desarrolladores con Spring Batch.


  • Batch Core : Contiene las clases necesarias para lanzar y controlar un Job  (Job Launcher , Job , Step).   


  • Batch Infrastructure : Contiene clases  (ItemReader , ItemWriter ) y servicios (RetryTemplate) que son usados en común por ambas capas . 


Práctica 1 - Configuración Spring Batch


Historia de Usuario


Yo Albert Einstein deseo se realice la configuración inicial para la versión actual de Spring Batch


Requisitos previos 


  1. Tener instalado jdk 17 ( Versión mínima requerida para Spring Batch 5).

  2. Maven 3.9.4 o superior  (En algunas versiones inferiores hay incompatibilidad con el  jdk 17 ). 

  3. Conexion a Base de Datos MySQL.


Nota :  Siguiendo la dinámica de Scrum decidimos plantear cada una de las prácticas con una historia de usuario . De igual forma al principio de cada práctica encontrará un listado de requisitos previos a tomar en cuenta para poder concluir cada tutorial . 


Para configurar el proyecto se puede realizar desde cualquier IDE, el Spring Boot CLI o si prefiere  el spring initializr.


Imagen 3 -Configuración de un proyecto spring batch en spring initializr


Los valores importantes a considerar es: 

  • Incluir la dependencia Spring Batch

  • Incluir la dependencia con el conector de MySql

  • Seleccionar una versión de spring boot que cuente con la versión de Spring Batch 5  . Al momento de ser escrito el artículo la versión actual (3.1.4)  cumplía este requerimiento . 


Si se ha escogido como gestor de dependencias maven el pom se observa así : 



<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>3.1.4</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<groupId>com.jrnoh</groupId>

<artifactId>empezando-spring-batch</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>empezando-spring-batch</name>

<description>Demo project for Spring Boot</description>

<properties>

<java.version>17</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-batch</artifactId>

</dependency>


<dependency>

<groupId>com.mysql</groupId>

<artifactId>mysql-connector-j</artifactId>

<scope>runtime</scope>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.springframework.batch</groupId>

<artifactId>spring-batch-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>




Ahora con estas simples configuraciones ya podemos crear nuestro primer batch, para  esto agregaremos la siguiente clase  : 


 

package com.jrnoh.demo.em.sb;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.StepContribution;

import org.springframework.batch.core.job.builder.JobBuilder;

import org.springframework.batch.core.repository.JobRepository;

import org.springframework.batch.core.scope.context.ChunkContext;

import org.springframework.batch.core.step.builder.StepBuilder;

import org.springframework.batch.core.step.tasklet.Tasklet;

import org.springframework.batch.core.step.tasklet.TaskletStep;

import org.springframework.batch.repeat.RepeatStatus;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.transaction.PlatformTransactionManager;



//Indica al contedor de beans de spring que la gestione

@Configuration

public class DirectionsSincronitationJob {

Logger logger = LoggerFactory.getLogger(this.getClass());

//Job que representa el batch en el contedor

@Bean

public Job sincronationDirection(JobRepository jobRepository,TaskletStep initStep) {

return new JobBuilder("directionsSincronitationJob", jobRepository)

.start(initStep) //Tarea que realizara el job

.build();

}

//Accion que realizara el job  , en este caso se puso que unicamente imprima un mensaje en consola 

@Bean

public TaskletStep initStep(JobRepository jobRepository, PlatformTransactionManager p) {

return new StepBuilder("initStep", jobRepository)

.tasklet(new Tasklet() {

@Override

public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

logger.info("******************** Iniciando Batch ***********");

return RepeatStatus.FINISHED;

}

}, p)

.build();

}


}



Y tambien las siguientes propiedades en el application.properties


#Conexion a la base de datos 


spring.datasource.url = jdbc:mysql://localhost:3306/empezando-spring-batch?createDatabaseIfNotExist=true

spring.datasource.username = root

spring.datasource.password = password


spring.jpa.hibernate.ddl-auto = update


# Crear el esquema de bd spring batch  al arrancar el proyecto

spring.batch.jdbc.initialize-schema=ALWAYS


Ahora ya podemos ejecutar la aplicación  (Si es de su preferencia la terminal ejecute mvn spring-boot:run ) . 



Primera ejecución del aplicativo batch. 


 

Otra cosa interesante es que al revisar nuestra base de datos podemos observar un esquema de base de datos creado con varias tablas las cuales han sido creadas por Spring Batch . En el transcurso de este artículo iremos hablando acerca del uso de algunas de ellas.

Modelo creado al ejecutar aplicación con Spring Batch. 


El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente : 


https://github.com/juanrenatonoh/empezando-con-spring-batch/tree/main/1-configuracion-spring-batch


JSR-352 y Jakarta Batch


El Java Specification Request 352 es la especificación que la comunidad ha definido para los desarrolladores de aplicaciones batch . En ella se encuentran las directrices de cómo implementar esta tecnología en entornos Java EE y Java SE . 


Actualmente  se encuentran publicadas  en la página de Java Community Process 2 implementaciones :  JBeret y Spring Batch.


Sin embargo por la creación del proyecto Jakarta EE se han renombrado  paquetes “javax” a “jakarta” y ha surgido la aparición de una nueva especificación (Jakarta Batch ) provocando que  JBeret base su implementación en este nuevo estándar .

Para el caso de Spring Batch en su versión 5 ha decidido discontinuar el uso del JSR-352 de acuerdo a la documentación oficial y todavía no añadir la especificación Jakarta Batch ya que el número de consumidores por esta vía es pequeño . 



Por ahora lo más obvio es que para las versiones e implementaciones recientes de java mejor consultemos a Jakarta Batch


Si desea comparar  ambas especificaciones  los enlaces son : 





Domain Language Batch


Spring batch ha basado  sus conceptos en la especificación Java Batch para proveer de una arquitectura  , componentes y servicios que resulten familiares a sus usuarios . El siguiente diagrama muestra los conceptos principales conocidos como el “Domain Language Batch”.

Conceptos principales Spring Batch (Domain Language Batch)


Los conceptos principales los podemos resumir de la siguiente manera : 

  • El JobLauncher es el encargado de ejecutar los Job en base a un listado de parámetros (JobParameter).

  • Un Job puede tener de uno a múltiples Steps ,en otras palabras es un contenedor de Steps.

  • Un Step es un objeto de dominio secuencial contenido en un Job puede estar conformado por  solo un ItemReader , ItemProcesor e ItemWriter.

  • El JobRepository es el mecanismo de persistencia que permite almacenar el scope del proceso batch. Su uso es para facilitar el reinicio del Job .

¿ Que es un Job ?

Jerarquía de un Job 


Un Job es una entidad que representa un proceso batch  es decir dentro de él está contenido el dominio del negocio y son ejecutados mediante una secuencia de pasos . Las características de un job son : 


  1. Posee un nombre.

  2. Tiene Step 's definidos y ordenados . 

  3. Puede o no ser reiniciable.


En  la imagen “Jerarquía de un Job” se presenta un proceso el cual es de ejecución diaria por ejemplo el 1 de Enero se ejecuta por primera vez  , el 2 de Enero se vuelve ejecutar, y así sucesivamente . A cada una de estas ejecución de forma lógica se les conoce como  Job Instance. 

Lo que distingue un JobInstance de otro son los valores parametrizados  , en este caso sería la fecha (1 Enero , 2 Enero , etc ) . Los parámetros  en Spring batch son conocidos como Job Parameter y su función es mantener un conjunto de valores que son proporcionados al iniciar un Job.


Ahora bien  ¿Es posible que un Job Instance sea ejecutado múltiples veces  ?  La respuesta es SÍ ,ya que un JobInstance no se considera completo a menos que la ejecución ( completa ) sea exitosa , por lo que previamente pudieron haber ocurrido múltiples intentos fallidos .  Cada uno de estos intentos  se le denomina  JobExecution .


Al ejecutar un batch en el esquema de Spring Batch existen las respectivas tablas  para almacenar cada uno de los metadatos de Job , Job Instance , Job Parameter y Job Instance.


Práctica 2 - Configuración y Ejecución del Job




Historia de Usuario


Yo Galileo deseo contar con un Api Rest para ejecutar y consultar mi proceso batch .


Requisitos previos 


  1. Haber realizado la práctica “Configuración Spring Batch” .  


De acuerdo al requerimiento se necesita un Api Rest por lo que ahora incluiremos la dependencia, Spring Web al pom de nuestro proyecto. 




<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>



Ahora crearemos nuestra clase JobRest con nuestros servicios de ejecutar y consultar . 




import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.HashMap;


import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobParametersBuilder;

import org.springframework.batch.core.JobParametersInvalidException;

import org.springframework.batch.core.explore.JobExplorer;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;

import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;

import org.springframework.batch.core.repository.JobRestartException;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;


/***

 * 

 * @author jnoh

 *

 */


@RestController

@RequestMapping("/job")

public class JobRest {

/***

* Servicio que me permite realizar el disparo de un job 

*/

@Autowired

JobLauncher jobLauncher;

/***

* Inyeccion de mi job

*/

@Autowired

Job job;

/**

* Servicio para consultar el JobRepository y todas las entidades del modelo 

* funcionalidades ahi 

*/

@Autowired

JobExplorer jobExplorer;

/**

* Lanza un job 

curl --location --request POST 'http://localhost:8080/job/ejecutar' \

--header 'Content-Type: application/json' \

--data-raw '{

  "fecha" : "2023-10-16"

}

'

* @throws JobExecutionAlreadyRunningException

* @throws JobRestartException

* @throws JobInstanceAlreadyCompleteException

* @throws JobParametersInvalidException

* @throws ParseException 

*/

@PostMapping("/ejecutar")

public ResponseEntity<HashMap<String, Object>> ejecutar(@RequestBody HashMap<String,Object> request)

{

var response = new HashMap<String,Object>();

try {

//Obtencion de los parametros y creacion de JobParametros por builder

var fecha = new SimpleDateFormat("yyyy-MM-dd").parse(request.get("fecha").toString());

var jobParametersBuilder = new JobParametersBuilder()

.addDate("fecha", fecha);

//Ejecucion Job por medio de job launcher 

var jobExecution = jobLauncher.run(job, jobParametersBuilder.toJobParameters());

response.put("jobId", jobExecution.getId());

} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException

| JobParametersInvalidException e ) 

{

// Error en caso ya exista recordar los conceptos de JobInstance , JobParameter y JobExecution

response.put("errorMessage :", e.getMessage());

}catch (Exception e) {

response.put("errorMessage :", "Ocurrio un error no controlado");

}

return ResponseEntity.ok(response);

}

/***

* Consulta por modelo

curl --location 'http://localhost:8080/job/job-execution/poneridaca' \

* @return

*/

@GetMapping("/job-execution/{jobExecutionId}")

public ResponseEntity<String> obtenerJobsInstances(@PathVariable Long jobExecutionId){

//Se consulta por medio del servicio que provee Spring Batch , se usa la interfaz Job Explorer cuando se crea el job tenemos un JobInstance.Id

var jobExecution =  jobExplorer.getJobExecution(jobExecutionId);

return ResponseEntity.ok(jobExecution == null ? "" : jobExecution.toString());

}


}


Y la última modificación en el código es incluir la anotación @EnableBatchProcesing en la configuración del Job .



//Indica al contedor de beans de spring que la gestione

@Configuration

@EnableBatchProcessing 

public class DirectionsSincronitationJob {


 


¡ Ahora es tiempo de ejecutar y probar los servicios !




Ejecución Spring Batch configurado con anotación. 


Lo primero que observamos ahora es que al levantar la aplicación el Job no se ejecutó de forma automática , esto es porque a partir de la versión 5 en Spring Batch al utilizar @EnableBatchProcesing o DefaultBatchConfigurer es necesario lanzar un Job por medio del JobLauncher .  


Para lanzar el job realicemos la siguiente petición al API . 



curl --location --request POST 'http://localhost:8080/job/ejecutar' \

--header 'Content-Type: application/json' \

--data-raw '{

 "fecha" : "2023-10-16"

}'


Observamos que como resultado obtenemos un JobId de respuesta con el cual podemos  consultar  en el modelo la información de ejecución.


Metadata Job ejecutado desde Api Rest. 


Para consultar la información de la ejecución del Job desde el api realicemos la siguiente petición utilizando el JobId obtenido en la creación . 



curl  'http://localhost:8080/job/job-execution/2'  \

--header 'Content-Type: application/json'




Consulta Metadata Job desde Api Rest



El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente : 


https://github.com/juanrenatonoh/empezando-con-spring-batch/tree/main/2-configuracion-ejecucion-job


¿Que es un Step ? 


Un Step es un objeto de dominio que encapsula una fase del dominio del Job y puede ser tan compleja o sencilla como se desee  . Al igual que el Job por cada intento de ejecución es almacenado es almacenado en el contexto de ejecución y se les conoce como StepExecution.


Por lo general un Step está conformado por : 


  • ItemReader : Abstracción que representa la obtención de datos. 

  • ItemProcesor : Abstracción que representa el procesamiento y aplicación de reglas de negocio.

  • ItemWriter : Abstracción que representa la salida de datos.


Una de las ventajas que provee Spring Batch es que provee múltiples implementaciones de ItemReader e ItemWriter permitiendo la lectura y escritura en múltiples circunstancias que lo requiere el negocio.


Apendice ItemReader e ItemWriter

De igual manera Spring Batch trabaja bajo el concepto de “chunk-oriented” es decir se define el tamaño de chunk (fragmento de datos) con el cual primero se leerán y / o procesaran para que una vez este completo ItemWriter se encarga de escribir y confirmarlos . 



Chunk-oriented con ItemProcessor


Practica 3 - Configuración y Ejecución Step’s



Historia de Usuario


Yo Julio Verne deseo que se importen las direcciones contenidas en el archivo direction.csv .


Requisitos previos 


  1. Haber realizado la práctica “Configuración  y Ejecución del Job ” .  


De acuerdo al requerimiento ahora necesitaremos importan las direcciones del csv direction.csv  , por lo que  crearemos una carpeta llamada staging en la raíz del proyecto y ahí agregaremos el archivo despues de descargarlo . 




Archivo direction agregado al proyecto para importarlo


Ahora incluiremos Spring Data JPA en el pom.xml

 



<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-data-jpa</artifactId>

</dependency>




¡ Es hora de modelar los datos de entrada y salida! . Primero crearemos la clase DirectionRecord que nos servirá para vaciar las filas del csv . 



public record  DirectionRecord  ( String codigoPostal,String asentamiento,String tipoAsentamiento,String municipio ){}


Y ahora creamos la entidad Direction 



@Entity

@Table(name = "Direction")

public class Direction {

@Id

@GeneratedValue(strategy = GenerationType.IDENTITY)

private Integer id;

private String codigoPostal;

private String asentamiento;

private String tipoAsentamiento;

private String municipio;


public Integer getId() {

return id;

}


public void setId(Integer id) {

this.id = id;

}


public String getCodigoPostal() {

return codigoPostal;

}


public void setCodigoPostal(String codigoPostal) {

this.codigoPostal = codigoPostal;

}


public String getAsentamiento() {

return asentamiento;

}


public void setAsentamiento(String asentamiento) {

this.asentamiento = asentamiento;

}


public String getTipoAsentamiento() {

return tipoAsentamiento;

}


public void setTipoAsentamiento(String tipoAsentamiento) {

this.tipoAsentamiento = tipoAsentamiento;

}


public String getMunicipio() {

return municipio;

}


public void setMunicipio(String municipio) {

this.municipio = municipio;

}

}


Con estas clases ya es posible  crear nuestro Step que se encargará de la importación de los registros . Para esto incluimos los siguientes beans en la clase DirectionsSincronitationJob.



/***

* Paso generado para la importacion de las direcciones

* @param jobRepository

* @param p

* @param fileReaderDirections

* @param procesorDirections

* @param writerDirections

* @return

*/

@Bean

public Step importDirections(JobRepository jobRepository,PlatformTransactionManager p

,FlatFileItemReader<DirectionRecord> fileReaderDirections

,ItemProcessor<DirectionRecord, Direction> procesorDirections

, RepositoryItemWriter<Direction> writerDirections) {

return new StepBuilder("importDirections",jobRepository)

.<DirectionRecord,Direction>chunk(10, p)//fragmentos de 10 

.reader(fileReaderDirections)

.processor(procesorDirections)

.writer(writerDirections)

.build();

}

/***

* Lectura del archivo

* @return

*/

@Bean

public FlatFileItemReader<DirectionRecord> fileReaderDirections(){

return new FlatFileItemReaderBuilder<DirectionRecord>()

.name("fileReaderDirections")

.resource(new FileSystemResource("staging/directions.csv")) // ubicacion del archivo

.delimited()

.names("codigoPostal","asentamiento","tipoAsentamiento","municipio")

.targetType(DirectionRecord.class) // clase en la que se vacia el cisv

.build();

}

@Bean

public ItemProcessor<DirectionRecord, Direction> procesorDirections(){

//convierte el direction record a una direction (entidad)

return new ItemProcessor<DirectionRecord, Direction>() {


@Override

public Direction process(DirectionRecord d) throws Exception {

Direction direction = new Direction();

direction.setAsentamiento(d.asentamiento());

direction.setCodigoPostal(d.codigoPostal());

direction.setMunicipio(d.municipio());

direction.setTipoAsentamiento(d.tipoAsentamiento());

return direction;

}

};

}

/***

* Se encarga de persistir por entidades 

* @param directionRepository

* @return

*/

@Bean

public RepositoryItemWriter<Direction> writerDirections(DirectionRepository directionRepository){

return new RepositoryItemWriterBuilder<Direction>()

.repository(directionRepository)

.build();

}


Por último agregamos el nuevo Step en el Job



//Job que representa el batch en el contedor

@Bean

public Job sincronationDirection(JobRepository jobRepository,TaskletStep initStep,Step importDirections) {

return new JobBuilder(NAME_JOB, jobRepository)

.start(initStep) //Tarea que realizara el job

.next(importDirections)//Nueva Tarea importacion

.build();

}




¡ Ahora es tiempo de ejecutar y probar los servicios ! (Recuerda que la ejecución del servicio lo vimos en la práctica 2)



Resultado en consola y StepExecution con la importación




Direcciones importadas


El repositorio de código con el ejercicio completo lo puede encontrar en el siguiente : 


https://github.com/juanrenatonoh/empezando-con-spring-batch/tree/main/3-configuracion-ejecucion-steps


Referencias



Comentarios

Entradas populares de este blog

Plataformas gratuitas para desplegar aplicaciones web

Mi experiencia corriendo deepseek en local .

Documentar API REST con Spring Doc (Swagger)