Spring Batch and MongoDB-
- AbstractPaginatedDataItemReader.class
- GemfireItemWriter.class
- MongoItemReader.class
- MongoItemWriter.class
- Neo4jItemReader.class
- Neo4jItemWriter.class
- RepositoryItemReader.class
- RepositoryItemWriter.class
- SpELMappingGemfireItemWriter.class
Spring Batch
Spring Batch is a Spring-based framework for enterprise Java batch processing. An important aspect of Spring Batch is the separation between reading from and writing to resources and the processing of a single record, called item in the Spring Batch lingo. There are a lot of existing item readers and writers for a wide range of resources like JDBC databases, JMS messaging systems, flat file etc. If the resource of your choice is not supported of of the box, it is easy to implement your own reader and writer as we will see in a minute.
MongoDB
MongoDB is a popular NoSQL datastore. It stores so called documents (basically an ordered set of key/value pairs where a value can be a simple data type like String or integer but also an array of values or a sub document). MongoDB is optimized for heavy write throughput and horizontal scaling.
Since I am a big fan of MongoDB on the one hand and introducing the Spring Batch framework at one of my customers on the other hand, why not implement a Spring Batch item reader(xml reader) and writer for MongoDB(MongoItemWriter).
MongoDB Item Writer-MongoItemWriter
My first approach to the item writer was very naive. I just took the DBObject item list and inserted them into the target collection. This can be done with the following configuration:
<!-- write it to MongoDB, 'employee' collection (table) --> <bean id="mongodbItemWriter" class="org.springframework.batch.item.data.MongoItemWriter"> <property name="template" ref="mongoTemplate" /> <property name="collection" value="employee" /> </bean>
These are possible parameters:
template and collection determine the MongoDB template and what collection to write to. These parameters are required, all other are optional.
MongoDB Item Reader-MongoItemReader
Implementing the item reader was straightforward. It was merely a matter of passing parameters to the underlying MongoDB driver API.
<!-- reader it from MongoDB, 'employee' collection (table) --> <bean id="mongodbItemReader" class="org.springframework.batch.item.data.MongoItemReader"> <property name="template" ref="mongoTemplate" /> <property name="query" value="{age: {$gt: 22}" /> </bean>
We have three kinds of parameters:
template and collection determine the MongoDB template and what collection to read from. These parameters are required, all other are optional.
query and keys are making up the MongoDB query. The first one is the query itself, the second one selects the field to read. If you don’t set a query string, all documents from the collection are read.
By default, the item reader emits DBObject instances that come from the MongoDB driver API. These objects are basically ordered hashmaps. If you want to use another representation of your data in the item processor, you can write a custom converter.
public class DocumentEmployeeConverter implements Converter<DBObject Employee> { @Override public Employee convert(DBObject document) { Employee emp = new Employee(); emp.setEmpid((String)document.get("_id")); emp.setName((String)document.get("name")); emp.setAge((Integer)document.get("age")); emp.setSalary((Integer)document.get("salary")); emp.setAddress((String)document.get("address")); return emp; } }
Example XML File To MongoDB Database-
Now we will discuss how to configure a Spring Batch job to read data from an XML file (XStream library) into a no SQL database (MongoDB). In additional, create a unit test case to launch and test the batch jobs.
Tools and libraries used
- Spring Tool Suite (STS)
- JDK 1.6
- Spring Core 3.2.2.RELEASE
- Spring OXM 3.2.2.RELEASE
- Spring Batch 2.2.0.RELEASE
- MongoDB Java Driver 2.7.3
- MongoDB 2.10.1
1. Project Directory Structure
2. Input XML File
employees.xml
<?xml version="1.0" encoding="UTF-8"?> <employees> <employee> <address>delhi</address> <age>17</age> <empid>1111</empid> <name>ATUL KUMAR</name> <salary>300000.0</salary> </employee> <employee> <address>delhi</address> <age>27</age> <empid>2222</empid> <name>Dinesh Rajput</name> <salary>60000.0</salary> </employee> <employee> <address>delhi</address> <age>21</age> <empid>3333</empid> <name>ASHUTOSH RAJPUT</name> <salary>400000.0</salary> </employee> <employee> <address>Kanpur</address> <age>27</age> <empid>4444</empid> <name>Adesh Verma</name> <salary>80000.0</salary> </employee> <employee> <address>Noida</address> <age>37</age> <empid>5555</empid> <name>Dinesh Rajput</name> <salary>300000.0</salary> </employee> </employees>
3. ItemReader for XML File
In this example, we use Jaxb2Marshaller to map XML values and attributes to an object.
<bean id="xmlItemReader" class="org.springframework.batch.item.xml.StaxEventItemReader"> <property name="resource" value="classpath:xml/employees.xml" /> <property name="unmarshaller" ref="empUnMarshaller" /> <property name="fragmentRootElementName" value="employee" /> </bean>
Employee.java
package com.doj.batch.bean; import javax.xml.bind.annotation.XmlAccessOrder; import javax.xml.bind.annotation.XmlAccessorOrder; import javax.xml.bind.annotation.XmlRootElement; /** * @author Dinesh Rajput * */ @XmlRootElement(name="employee") @XmlAccessorOrder(XmlAccessOrder.UNDEFINED) public class Employee { private int empid; private String name; private int age; private float salary; private String address; /** * @return the empid */ public int getEmpid() { return empid; } /** * @param empid the empid to set */ public void setEmpid(int empid) { this.empid = empid; } /** * @return the name */ public String getName() { return name; } /** * @param name the name to set */ public void setName(String name) { this.name = name; } /** * @return the age */ public int getAge() { return age; } /** * @param age the age to set */ public void setAge(int age) { this.age = age; } /** * @return the salary */ public float getSalary() { return salary; } /** * @param salary the salary to set */ public void setSalary(float salary) { this.salary = salary; } /** * @return the address */ public String getAddress() { return address; } /** * @param address the address to set */ public void setAddress(String address) { this.address = address; } }
4. MongoDB configuration
mongodbConfig.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:mongo="http://www.springframework.org/schema/data/mongo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd"> <mongo:mongo host="127.0.0.1" port="27017" /> <mongo:db-factory dbname="davdb" id="mongoDbFactory"/> <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate"> <constructor-arg name="mongoDbFactory" ref="mongoDbFactory" /> </bean> </beans>
5. Spring Batch Core configuration
Define jobRepository and jobLauncher.
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository"/> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
6. Spring Batch Jobs Configuration file
First, I define the simple-job.xml and mongodbConfig.xml files for configuration. In these file, I specify the org.springframework.batch.item.xml.StaxEventItemReader, which is a class from the Spring Batch framework. I specify the resource to the org.springframework.batch.item.xml.StaxEventItemReader as the path of the input xml file. Here I say the resource value is classpath:xmlemployees.xml, i.e., the location of input file employees.xml. I also define the unmarshaller object for converting xml data to java object of Employee class. Then I define fragmentRootElementName, which have value employee . I can cater that through my defined EmployeeFilterProcessor class which implements the ItemProcessor class of the Spring Batch framework.
After this, I specify the MongoDB details by mentioning the hostname where the database is installed and also the port number. I access the database through the MongoTemplate, which takes the reference of the database details mentioned through the id (i.e., Mongo as the argument). In the MongoTemplate I also pass the other argument (i.e., the name of the database I will work with inside the MongoDB), and in this case it is “new.” Now I define my own class, MongoDBItemWriter, which is the extension of the ItemWriter class in Spring Batch. This class now reads the MongoTemplate to get the details of the database.
Next, I specify the DynamicJobParameters class, which implements the JobParametersIncrementer from the Spring Batch. This works as the incrementer for the job.
Finally, I specify my batch job where I give the batch:step and batch:tasklet details. The batch job here is simpleDojJob, which contains a single step that holds the tasklet where the task mentioned is to read the batch:chunk from the xmlItemReader. I also mention the process and the itemwriter details.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.0.xsd"> <import resource="applicationContext.xml"/> <import resource="mongodbConfig.xml"/> <bean id="employeeFilterProcessor" class="com.doj.batch.processor.EmployeeFilterProcessor"> <bean id="xmlItemReader" class="org.springframework.batch.item.xml.StaxEventItemReader"> <property name="resource" value="classpath:xml/employees.xml" /> <property name="unmarshaller" ref="empUnMarshaller" /> <property name="fragmentRootElementName" value="employee" /> </bean> <bean id="empUnMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound"> <value>com.doj.batch.bean.Employee</value> </property> </bean> <!-- write it to MongoDB, 'employee' collection (table) --> <bean id="mongodbItemWriter" class="org.springframework.batch.item.data.MongoItemWriter"> <property name="template" ref="mongoTemplate" /> <property name="collection" value="employee" /> </bean> <batch:job id="simpleDojJob" parent="simpleJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="xmlItemReader" processor="employeeFilterProcessor" writer="mongodbItemWriter" commit-interval="2"/> </batch:tasklet> </batch:step> </batch:job> </beans>
8. EmployeeFilterProcessor.java
package com.doj.batch.processor; import org.springframework.batch.item.ItemProcessor; import com.doj.batch.bean.Employee; /** * @author Dinesh Rajput * */ public class EmployeeFilterProcessor implements ItemProcessor<Employee, Employee> { @Override public Employee process(Employee emp) throws Exception { if(emp.getSalary() > 70000.0){ return emp; }else{ return null; } } }
Launching Batch Job-
Spring Batch comes with a simple utility class called CommandLineJobRunner which has a main() method which accepts two arguments. First argument is the spring application context file containing job definition and the second is the name of the job to be executed.
Now run as a java application with both two arguments.
org.springframework.batch.core.launch.support.CommandLineJobRunner
simple-job.xml simpleDojJob
Output. The Spring Batch metadata tables are created, and the content of employees.xml is inserted into mongodb database “davdb” collection “EMPLOYEE”.
Download Source Code with Jars
SpringBatchXMLtoMongoDB.zip