Install this theme
Processing dependent files with Apache Camel

Imagine, you have following scenario:

  1. Some folder has to be polled for files (i will name them “header”-files), that contain only the meta-information (e.g. filesystem location) of the real “data”-file(s), that should be processed.
  2. From the incoming “header”-file you have to read the data"-file(s) filesystem location and process it from there.
  3. Both “header”- and “data”-file(s) should be moved to the particular “done”-folder after processing. Though the “header”-file is only allowed to be moved, if the appropriate “data”-file(s) was/were successfully processed.
  4. In case the appropriate “data”-file does not exist, the “header”-file message should be rollbacked and the “header”-file must be processed once again after the specified timeout.

First, let’s define our demo filesystem structure.
We have 2 incoming folders “headers” and “data”, where the “headers”-folder contains the “header”-files and the “data”-folder the “data”-files respectively.

../in/headers

header-001.txt
header-002.txt
header-003.txt
header-004.txt
header-005.txt

../in/data

data-001.txt
data-002.txt
data-003.txt
data-004.txt
data-005.txt

For the sake of convenience each “header”-file contain only one row with the name of the corresponding “data”-file. Something like this:

header-001.txt

data-001.txt

The first step is to create the route that polls the “headers”-folder, logs the “header”-file found, reads it’s content as string and sends it as the message body to the subsequent node.

from("file:target/in/headers")
   .to("log:net.javaforge.blog.file.header?level=INFO")
   .transform(body(String.class))
   .to("mock:out");

From now on the outgoing message body contains the content of the “header”-file, namely the name of the associated “data”-file.

Next step is to implement a processor, that consumes the “data”-file and send it as a new exchange forward to the subsequent recipient.
This is where I end up:

class HeaderFileBodyProcessor implements Processor {

    private String dataFileFolderURI;
    private String recipientEndpointURI;
    private int dataFileReadTimeout;

    HeaderFileBodyProcessor(String dataFileFolderURI, String recipientEndpointURI, int dataFileReadTimeout) {
        this.dataFileFolderURI = dataFileFolderURI;
        this.recipientEndpointURI = recipientEndpointURI;
        this.dataFileReadTimeout = dataFileReadTimeout;
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        Exchange data = this.receiveBodyFile(exchange);
        this.sendToNextEndpoint(exchange, data);
    }

    private Exchange receiveBodyFile(Exchange originalExchange) throws Exception {
        String filename = originalExchange.getIn().getBody(String.class);
        ConsumerTemplate consumer =
                originalExchange.getContext().createConsumerTemplate();
        try {
            return consumer.receive(this.dataFileFolderURI + "?fileName=" + filename, dataFileReadTimeout);
        } finally {
            // stop the consumer as it does not need to poll for files anymore 
            consumer.stop();
        }
    }

    private void sendToNextEndpoint(Exchange originalExchange, Exchange dataFileExchange) throws Exception {
        if (dataFileExchange != null) {
            ProducerTemplate prod =
                    originalExchange.getContext().createProducerTemplate();
            try {
                prod.send(this.recipientEndpointURI, dataFileExchange);
            } finally {
                prod.stop();
            }
        } else {
            // rollback if no body file... 
            throw new CamelExchangeException("Cannot find the body file " +
                    originalExchange.getIn().getBody(String.class), originalExchange);
        }
    }
} 

For each incoming “data”-file name the processor creates a new file consumer, that tries to read the data-file within the specified timeout intervall. After the data file was read, the consumer will be explicitly stopped, since it does not need to poll for files anymore. In case, the data-file was found, it will be send as a new exchange to the next recipient, otherwise the CamelExchangeException will be thrown. This exception forces Apache Camel to rollback the incoming “header”-file Exchange and poll it againg later.

Extended camel route looks now like this:

from("file:target/in/headers")
   .to("log:net.javaforge.blog.file.header?level=INFO") 
   .transform(body(String.class)) 
   .process(new HeaderFileBodyProcessor("file://target/in/data", "direct:data", 60 * 1000)) 
   .to("mock:out");    

Thus found data-files will end up in the “direct:data” endpoint. Last step is to read and process them from the “direct:data”:

from("direct:data") 
   .to("log:net.javaforge.blog.file.body?level=INFO") 
   .process(new DataFileProcessor())
   .to("mock:data");     
class DataFileProcessor implements Processor {

    private static final Logger log = LoggerFactory.getLogger(DataFileProcessor.class);

    @Override
    public void process(Exchange exchange) throws Exception {
        log.info("processing incoming data file exchange: {}", exchange);
    }
}     

Due to Apache Camel’s default behavior of the “file”-component all successfully processed files will end up in the particular “.camel”-subfolder:

  • ../in/headers/.camel
  • ../in/data/.camel

This can be improved using file-component’s options… For example:

from("file:target/in/headers?move=done/${date:now:yyyyMMddhhmmss}/${file:name}")

There is only one thing that is still not really good enough in our route definition. It is blocking. On each incoming “header”-file the processing will wait for max. timeout period for the assocciated “data”-file, even if other “header”- and “data”-files are awailable and can be processed in parallel.

Fixing this is simple just by adding multi-threading possibilities to our route definition:

from("file:target/in/headers")
   .to("log:net.javaforge.blog.file.header?level=INFO") 
   .transform(body(String.class)) 
   .threads(10) 
   .process(new HeaderFileBodyProcessor("file://target/in/data", "direct:data", 60 * 1000))
   .to("mock:out");

Voila! Now we can process header-files in 10 parallel threads without blocking…

 
Blog comments powered by Disqus