When you implementing the event-driven system with apache kafka it may be needed to integrate with external systems, that require respecting their old-fashioned request-reply semantics. So you basically need to respond to them in the same thread.

Imagine the situation where this external system initiates a process for checking some information about a person: documents ids, photos, name, etc. For simplicity, it will be one service: person-checker-api. In event-driven systems, we have an opportunity to subscribe to some events, so when an external system will initiate a checking, we can write information about a person from checking event in other services, same applies to check result, there is no direct reference between services.

request-reply-diagram.jpg

Out external system gateway(gateway-api) will fire a checking event to “Person.Check.Initiated” topic and checked result will be in another topic - “Person.Checked”, which will be populated from person-checker-api. Remember, that our gateway-api need to respond in a synchronous manner.

Spring for apache kafka provides a ReplyingKafkaTemplate, but if you want to use it with multiple instances, you need to play with manual partition assignments. If you run your receiving service on a single instance, it’s definitely an option.

In case if you have multiple receiving instances(gateway-api) you need some shared store: database, distributed data grid, or maybe something else. In this example, I will use a database, but I recommend you to consider data grid, such as hazelcast. Person check results will be stored in the table, so in case of gateway-api will receive request for checking a person with the same id, you can just immediately return the previous result.

Stale data: In case of providing previously saved result directly from database, you need to care about stale data.

When gateway-api receives checking request, it’s sending event to “Person.Check.Initiated” topic and begins to polling for check information from his own table.

    @PostMapping("/check")
    //you may want to try/catch this
    @SneakyThrows({InterruptedException.class, ExecutionException.class, TimeoutException.class})
    public Boolean checkPerson(@RequestBody Person person) {
        kafkaTemplate.send("Person.Check.Initiated", person.getId(), new PersonCheckInitiated(person));
        return pollForCheckResult(person.getId()).get(50, TimeUnit.SECONDS).getCheckResult();
    }

    private CompletableFuture<PersonCheckResult> pollForCheckResult(Integer personId) {
        CompletableFuture<PersonCheckResult> checkResultCompletableFuture = new CompletableFuture<>();
        final ScheduledFuture<?> checkResultScheduledFuture = executor.scheduleAtFixedRate(() -> {
            log.info("Checking result for person with id: {}", personId);
            Optional<PersonCheckResult> optionalCheckResult = checkResultRepository.findByPersonId(personId);
            optionalCheckResult.ifPresent(checkResultCompletableFuture::complete);
        }, 1, 1, TimeUnit.SECONDS);
        //we don't want to run this future indefinitely
        executor.schedule(() -> {
            log.info("Cancelling check for person with id: {}", personId);
            checkResultScheduledFuture.cancel(true);
        }, 65, TimeUnit.SECONDS);
        //cancel polling when result is received
        checkResultCompletableFuture.whenComplete((personCheckResult, throwable) -> checkResultScheduledFuture.cancel(true));
        return checkResultCompletableFuture;
    }

Here we are using the power of scheduled future to periodically poll table for changes. These changes are writing from “Person.Checked” topic:

    @KafkaListener(topics = "Person.Checked")
    public void personCheckedReceived(PersonChecked personChecked) {
        PersonCheckResult personCheckResult = new PersonCheckResult();
        personCheckResult.setPersonId(personChecked.getPersonId());
        personChecked.setCheckResult(personChecked.getCheckResult());
        checkResultRepository.save(personCheckResult);
    }

And that’s all! Just listening to the needed topic, save information in shared store and periodically check for it.

You can check the example with h2 database here: kafka-sync-example

To test it you need to initiate checking:

curl -X POST http://localhost:8080/check -H 'Content-Type: application/json' -d '{"id": 52, "firstName": "Test", "lastName": "Test"}'

And emulate check result by posting an event to the topic:

echo '{"personId": 52, "checkResult": true}' |  kafkacat -P -b localhost:9092 -t Person.Checked -p -1