Imagine, you have following scenario:
First, let’s define our demo filesystem structure.
We have 2 incoming folders “headers” and “data”, where the “headers”-folder contains the “header”-files and the “data”-folder the “data”-files respectively.
../in/headers
header-001.txt
header-002.txt
header-003.txt
header-004.txt
header-005.txt
../in/data
data-001.txt
data-002.txt
data-003.txt
data-004.txt
data-005.txt
For the sake of convenience each “header”-file contain only one row with the name of the corresponding “data”-file. Something like this:
header-001.txt
data-001.txt
The first step is to create the route that polls the “headers”-folder, logs the “header”-file found, reads it’s content as string and sends it as the message body to the subsequent node.
from("file:target/in/headers") .to("log:net.javaforge.blog.file.header?level=INFO") .transform(body(String.class)) .to("mock:out");
From now on the outgoing message body contains the content of the “header”-file, namely the name of the associated “data”-file.
Next step is to implement a processor, that consumes the “data”-file and send it as a new exchange forward to the subsequent recipient.
This is where I end up:
class HeaderFileBodyProcessor implements Processor { private String dataFileFolderURI; private String recipientEndpointURI; private int dataFileReadTimeout; HeaderFileBodyProcessor(String dataFileFolderURI, String recipientEndpointURI, int dataFileReadTimeout) { this.dataFileFolderURI = dataFileFolderURI; this.recipientEndpointURI = recipientEndpointURI; this.dataFileReadTimeout = dataFileReadTimeout; } @Override public void process(Exchange exchange) throws Exception { Exchange data = this.receiveBodyFile(exchange); this.sendToNextEndpoint(exchange, data); } private Exchange receiveBodyFile(Exchange originalExchange) throws Exception { String filename = originalExchange.getIn().getBody(String.class); ConsumerTemplate consumer = originalExchange.getContext().createConsumerTemplate(); try { return consumer.receive(this.dataFileFolderURI + "?fileName=" + filename, dataFileReadTimeout); } finally { // stop the consumer as it does not need to poll for files anymore consumer.stop(); } } private void sendToNextEndpoint(Exchange originalExchange, Exchange dataFileExchange) throws Exception { if (dataFileExchange != null) { ProducerTemplate prod = originalExchange.getContext().createProducerTemplate(); try { prod.send(this.recipientEndpointURI, dataFileExchange); } finally { prod.stop(); } } else { // rollback if no body file... throw new CamelExchangeException("Cannot find the body file " + originalExchange.getIn().getBody(String.class), originalExchange); } } }
For each incoming “data”-file name the processor creates a new file consumer, that tries to read the data-file within the specified timeout intervall. After the data file was read, the consumer will be explicitly stopped, since it does not need to poll for files anymore. In case, the data-file was found, it will be send as a new exchange to the next recipient, otherwise the CamelExchangeException will be thrown. This exception forces Apache Camel to rollback the incoming “header”-file Exchange and poll it againg later.
Extended camel route looks now like this:
from("file:target/in/headers") .to("log:net.javaforge.blog.file.header?level=INFO") .transform(body(String.class)) .process(new HeaderFileBodyProcessor("file://target/in/data", "direct:data", 60 * 1000)) .to("mock:out");
Thus found data-files will end up in the “direct:data” endpoint. Last step is to read and process them from the “direct:data”:
from("direct:data") .to("log:net.javaforge.blog.file.body?level=INFO") .process(new DataFileProcessor()) .to("mock:data");
class DataFileProcessor implements Processor { private static final Logger log = LoggerFactory.getLogger(DataFileProcessor.class); @Override public void process(Exchange exchange) throws Exception { log.info("processing incoming data file exchange: {}", exchange); } }
Due to Apache Camel’s default behavior of the “file”-component all successfully processed files will end up in the particular “.camel”-subfolder:
This can be improved using file-component’s options… For example:
from("file:target/in/headers?move=done/${date:now:yyyyMMddhhmmss}/${file:name}")
There is only one thing that is still not really good enough in our route definition. It is blocking. On each incoming “header”-file the processing will wait for max. timeout period for the assocciated “data”-file, even if other “header”- and “data”-files are awailable and can be processed in parallel.
Fixing this is simple just by adding multi-threading possibilities to our route definition:
from("file:target/in/headers") .to("log:net.javaforge.blog.file.header?level=INFO") .transform(body(String.class)) .threads(10) .process(new HeaderFileBodyProcessor("file://target/in/data", "direct:data", 60 * 1000)) .to("mock:out");
Voila! Now we can process header-files in 10 parallel threads without blocking…