This kind of endpoint is usually store and forward -- we want to capture and handle any errors - because there's no one on the other side to react to our error.
So we want to get the raw data to the message broker as fast and risk-free as possible. We can always clean it up later in SQL or with Apache Spark, Flink or some other tool.
But at high volumes we also want to make sure that the batch size never gets bigger than a certain configured amount (and we want something to test for this example). So we are splitting large batches into smaller chunks and sending those chunks to Kafka.
publicvoideventService(String json) {try {// a list of different schemas can come in, so handle as a list of mapsTypeReference<List<Map<String,Object>>> typeRef =newTypeReference<>() {};List<Map<String,Object>> list =objectMapper.readValue(json, typeRef);// It's nice to leave the records in batches for performance, but make sure the batches are not hugeif (list.size() <= chunkSize) {// send it!writer.write(list); } else {// split into arrays of chunkSizeAtomicInteger counter =newAtomicInteger();list.stream().collect(Collectors.groupingBy(i ->counter.getAndIncrement() / chunkSize)).values().forEach(writer::write); } } catch (Exception ignore) {// todo - forward failures to re-queue } }
Kafka Producer
Finally we are calling the Kafka producer that:
implements a local queue - batching our requests
then forwards to the message broker in batches
calls our callback function (if we set one)
/* * Write to the Spring kafka producer * the real producer is wrapped by our KafkaTemplate bean * Our callback from the Kafka Producer */publicvoidwrite(Object Object) {try {var future =this.kafkaTemplate.send(this.topic, Object);future.addCallback(newKafkaCallback()); } catch (Exception e) { } }
In this case we have implemented a callback function that's not doing too much right now. It will:
report successes on a debug log
report errors on the error log
In a real implementation we would forward these errors to some sort of dead letter queue (DLQ)
/** * KafkaCallback * Handle callbacks from the Kafka Producer */staticclassKafkaCallbackimplementsListenableFutureCallback<SendResult<String,Object>> { @OverridepublicvoidonFailure(Throwable ex) {log.error("FOOBAR-DLQ: {}",ex.getLocalizedMessage()); } @OverridepublicvoidonSuccess(SendResult<String,Object> result) {if (result ==null ) return;RecordMetadata meta =result.getRecordMetadata();log.debug("Delivered {}<--[{}] at partition={}, offset={}",meta.topic(),result.toString(),meta.partition(),meta.offset()); } }