Skip to content
~/dipjyoti
Go back

Kafka

What is Apache Kafka?

Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Behind the scenes, Kafka is distributed, scales well, replicates data across brokers (servers), can survive broker downtime, and much more. Kafka distributed streaming overview

Topics, Partitions and Offsets

Topics: A particular stream of data

Topics are split in partitions

Example Scenario : You can have multiple cabs, and each cabs reports its GPS location to kafka. You can have a topic cabs_gps that contains the position of all cabs. Each cab will send a message to kafka every 20 sec, each message will contain the cabID, and the cab location(lat/long)

Brokers & Topics

Example of topic A with 3 partitions Example of topic B with 2 partitions Kafka topics split across brokers

Topics replication

Producer

@Slf4j
public static void main(String[] args) {
    String topic = "second-topic";
    String value = "hello kafka";
    String bootstrapServer = "127.0.0.1:9092";
    // Create producer properties
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // Create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
    log.info("Creating producer");
    // Send Data
    producer.send(record, (metadata, e) -> {
        // Execute every time record is successfully send
        if (e == null) {
            log.info((metadata.timestamp()));
            log.info(topic, metadata.topic());
            log.info(metadata.hasOffset());
            log.info(metadata.hasTimestamp());
        } else {
            e.printStackTrace();
        }
    });
    producer.flush();
    producer.close();
}

Consumer

public static void main(String[] args) {

    String bootstrapServer = "127.0.0.1:9092";
    String groupId = "my-sixth-application";
    String topic = "second-topic";

    // Create consumer config
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Create consumer
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

    // subscribe consumer to our topic
    consumer.subscribe(Arrays.asList(topic));

    // poll for the new data
    while (true) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            log.info("Key: " + record.key() + ", Value: " + record.value());
            log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
        }
    }
}

Zookeeper

Schema Registry

Avro

Apache Avro is a data serialization system.

{"namespace": "dip.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number",  "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Kafka Rest Proxy


Share this post:

Previous Post
function
Next Post
Github Eyes