Spring Batch Processing Example

Project Structure

BatchConfig.java


package com.example.batch.config;

import java.io.File;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import com.example.batch.entity.Employee;
import com.example.batch.listener.JobListener;
import com.example.batch.listener.StepListener;
import com.example.batch.processor.EmployeeProcessor;
import com.example.batch.repository.EmployeeRepository;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
	@Autowired
	private JobBuilderFactory jobBuilderFactory;
	@Autowired
	private StepBuilderFactory stepBuilderFactory;
	@Autowired
	private EmployeeRepository employeeRepository;
	@Value("${location}")
	private String location;
	private String uploadFile = "";
	@Autowired
	private StepListener stepListener;
	@Autowired
	private JobListener jobListener;

	public FlatFileItemReader<Employee> readDataFromFile() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader();
        FileSystemResource fileResource=new 
        FileSystemResource(location + File.separator + uploadFile);
		itemReader.setResource(fileResource);
		itemReader.setName("csvReader");
		itemReader.setLinesToSkip(1);
		itemReader.setLineMapper(lineMapper());
		return itemReader;
	}

	private LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setDelimiter(",");
		lineTokenizer.setStrict(false);
		lineTokenizer.setNames("First Name", 
        "Last Name", "Email", "Phone", "Gender", "Age", "Job Title",
				"Years Of Experience", "Salary", "Department");

		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new 
        BeanWrapperFieldSetMapper();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}

	@Bean
	public EmployeeProcessor employeeProcessor() {
		return new EmployeeProcessor();
	}

	public RepositoryItemWriter<Employee> repositoryItemWriter() {
		RepositoryItemWriter<Employee> repositoryItemWriter = new 
        RepositoryItemWriter();
		repositoryItemWriter.setRepository(employeeRepository);
		repositoryItemWriter.setMethodName("save");
		return repositoryItemWriter;
	}

	public Step step1() {
		return stepBuilderFactory.get("step1").
        <Employee, Employee>chunk(10).reader(readDataFromFile())
				.processor(employeeProcessor())
                .writer(repositoryItemWriter()).
                listener(stepListener).build();
	}

	public Job startJob() {
		return jobBuilderFactory.get("saveEmployee").
        listener(jobListener).flow(step1()).end().build();
	}

	public String getUploadFile() {
		return uploadFile;
	}

	public void setUploadFile(String uploadFile) {
		this.uploadFile = uploadFile;
	}

}

AppController.java


package com.example.batch.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import com.example.batch.service.FileDetailService;

@RestController
@RequestMapping("/api/v1")
public class AppController {

	@Autowired
	private FileDetailService fileDetailService;

	@PostMapping("/fileUpload")
	public ResponseEntity<Object> uploadFile(@RequestParam("file") 
    MultipartFile file) {
		return fileDetailService.uploadFileDetail(file);
	}
	
}

Employee.java


package com.example.batch.entity;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.Getter;
import lombok.Setter;

@Entity
@Table(name = "employee_records")
@Setter
@Getter
public class Employee {
	@Id
	@GeneratedValue(strategy = GenerationType.AUTO)
	private int id;
	private String firstName;
	private String lastName;
	private String email;
	private String phone;
	private String gender;
	private int age;
	private String jobTitle;
	private String yearsOfExperience;
	private double salary;
	private String department;

}

FileDetailEntity.java


package com.example.batch.entity;


import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.Getter;
import lombok.Setter;

@Entity
@Table(name = "file_details")
@Setter
@Getter
public class FileDetailEntity {
	@Id
	@GeneratedValue(strategy = GenerationType.AUTO)
	private int id;
	@Column(name = "file_name",unique = true)
	private String fileName;
	private String uploadDate;

}

JobListener.java


package com.example.batch.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
@Component
public class JobListener implements JobExecutionListener {

	@Override
	public void beforeJob(JobExecution jobExecution) {
		System.out.println("Before Job....");
		
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		System.out.println("After Job....");
		
	}

}

 

StepListener.java


package com.example.batch.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
@Component
public class StepListener implements StepExecutionListener {

	@Override
	public void beforeStep(StepExecution stepExecution) {
		System.out.println("Before Step....");
		
	}

	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		System.out.println("After Step....");
		return ExitStatus.COMPLETED;
	}

}

WatcherServiceEvent.java


package com.example.batch.listener;

import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import com.example.batch.config.BatchConfig;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class WatcherServiceEvent implements 
ApplicationListener<ApplicationReadyEvent> {
	@Value("${location}")
	private String location;
	@Autowired
	private BatchConfig batchConfig;
	@Autowired
	private JobLauncher jobLauncher;

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		log.info("Watcher service enable....");
		watcherService();
	}

	private void watcherService() {
		Path directory = Paths.get(location);
		try (WatchService watchService = FileSystems.
        getDefault().newWatchService()) {
			directory.
            register(watchService, StandardWatchEventKinds.ENTRY_CREATE);

		while (true) {
			WatchKey key = watchService.take();

			for (WatchEvent event : key.pollEvents()) {

			if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
				Path filePath = directory.resolve((Path) event.context());
				log.info("file " + filePath.getFileName());
				if (filePath.getFileName().toString().
                  equals(event.context().toString())) {
					startJob(event.context().toString());
					}

				}

			}
				key.reset();
		}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private BatchStatus startJob(String uploadFileDetail) {
		JobParameters parameters = new 
        JobParametersBuilder().
        addLong("StartAt", System.currentTimeMillis())
				.toJobParameters();
		JobExecution jobExecution = null;
		try {
			batchConfig.setUploadFile(uploadFileDetail);
			jobExecution = jobLauncher.
            run(batchConfig.startJob(), parameters);

			if (jobExecution.getStatus().name().
            equalsIgnoreCase("COMPLETED")) {
				log.info("Completed.....");
			}
		} catch (Exception e) {
			log.info("Failed...");
			e.printStackTrace();
		}
		return jobExecution.getStatus();
	}

}

EmployeeProcessor.java


package com.example.batch.processor;

import org.springframework.batch.item.ItemProcessor;

import com.example.batch.entity.Employee;

public class EmployeeProcessor implements 
ItemProcessor<Employee, Employee>{

	@Override
	public Employee process(Employee employee) 
    throws Exception {
		return employee;
	}

}

EmployeeRepository.java


package com.example.batch.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.example.batch.entity.Employee;

@Repository
public interface EmployeeRepository extends 
JpaRepository<Employee, Integer> {

}

FileDetailRepository.java


package com.example.batch.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.example.batch.entity.FileDetailEntity;

@Repository
public interface FileDetailRepository extends 
JpaRepository<FileDetailEntity, Integer> {

}

FileDetailService.java


package com.example.batch.service;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import com.example.batch.entity.FileDetailEntity;
import com.example.batch.repository.FileDetailRepository;
import com.example.batch.utility.ResponseHandler;

@Service
public class FileDetailService {
	@Value("${location}")
	private String location;
	@Autowired
	private FileDetailRepository fileDetailRepository;
	private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");

	public ResponseEntity<Object> uploadFileDetail(MultipartFile file) {
		try {
			FileDetailEntity fileDetailEntity = new FileDetailEntity();
			fileDetailEntity.setFileName(file.getOriginalFilename());
			fileDetailEntity.setUploadDate(simpleDateFormat.format(new Date()));

			fileDetailRepository.save(fileDetailEntity);

			file.
            transferTo(new File(location + File.separator + file.getOriginalFilename()));

			return ResponseHandler.
            generateResponse("Successfully File upload", HttpStatus.OK, null);
		} catch (Exception e) {
			e.printStackTrace();
			return ResponseHandler.
            generateResponse(e.getMessage(), HttpStatus.MULTI_STATUS, null);
		}
	}

}

ResponseHandler.java


package com.example.batch.utility;

import java.util.HashMap;
import java.util.Map;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;

public class ResponseHandler {

	public static ResponseEntity<Object> 
    generateResponse(String message, HttpStatus status, Object responseObj) {
		Map<String, Object> map = new HashMap();
		map.put("message", message);
		map.put("status", status.value());
		map.put("data", responseObj);

		return new ResponseEntity<Object>(map, status);
	}

}

SpringBatchExampleApplication.java


package com.example.batch;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchExampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBatchExampleApplication.class, args);
	}

}

application.properties


 server.port=9090
 #DATABSE CONFIGURATION HERE
 spring.datasource.url=jdbc:mysql://localhost:3306/batchtutorial
 spring.datasource.username=root
 spring.datasource.password=password
 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
 spring.jpa.hibernate.ddl-auto=update

 #BATCH CONFIGURATION HERE
 spring.batch.initialize-schema=always
 spring.batch.job.enabled=false

 #SFTP SERVER INFO
 location=F://upload

 

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
    https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.3.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>Spring-Batch-Example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>Spring-Batch-Example</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Postman Request

Database Tables

No comments: