walk-through
Controller
@Configuration
public class EventRouter {
@Bean
public RouterFunction<ServerResponse> route(EventHandler handler) {
return RouterFunctions.route()
.POST("/events", handler::eventHandler)
.build();
}
}Handler
public void eventService(String json) {
try {
// a list of different schemas can come in, so handle as a list of maps
TypeReference<List<Map<String, Object>>> typeRef = new TypeReference<>() {};
List<Map<String, Object>> list = objectMapper.readValue(json, typeRef);
// It's nice to leave the records in batches for performance, but make sure the batches are not huge
if (list.size() <= chunkSize) {
// send it!
writer.write(list);
} else {
// split into arrays of chunkSize
AtomicInteger counter = new AtomicInteger();
list.stream()
.collect(Collectors.groupingBy(i -> counter.getAndIncrement() / chunkSize))
.values()
.forEach(writer::write);
}
} catch (Exception ignore) {
// todo - forward failures to re-queue
}
}Kafka Producer
Last updated