Um guia passo a passo para configurar e executar

Milhões de registros de dados estão sendo gerados todos os dias nos sistemas de computação atuais. Isso inclui suas transações financeiras, fazer um pedido ou dados do sensor do seu carro. Para processar esses eventos de streaming de dados em tempo real e mover registros de eventos de forma confiável entre diferentes sistemas corporativos, você precisa Apache Kafka.

O Apache Kafka é uma solução de streaming de dados de código aberto que lida com mais de 1 milhão de registros por segundo. Juntamente com esse alto rendimento, o Apache Kafka oferece alta escalabilidade e disponibilidade, baixa latência e armazenamento permanente.

Empresas como LinkedIn, Uber e Netflix contam com o Apache Kafka para processamento em tempo real e streaming de dados. A maneira mais fácil de começar a usar o Apache Kafka é instalá-lo e executá-lo em sua máquina local. Isso permite que você não apenas veja o servidor Apache Kafka em ação, mas também permite que você produza e consuma mensagens.

Com experiência prática em iniciar o servidor, criar tópicos e escrever código Java usando o cliente Kafka, você estará pronto para usar o Apache Kafka para atender a todas as suas necessidades de pipeline de dados.

Como baixar o Apache Kafka em sua máquina local

Você pode baixar a versão mais recente do Apache Kafka no link oficial. O conteúdo baixado será compactado no formato .tgz. Uma vez baixado, você terá que extrair o mesmo.

Se você for Linux, abra seu terminal. Em seguida, navegue até o local onde você baixou a versão compactada do Apache Kafka. Execute o seguinte comando:

tar -xzvf kafka_2.13-3.5.0.tgz

Depois que o comando for concluído, você descobrirá que um novo diretório chamado kafka_2.13-3.5.0. Navegue dentro da pasta usando:

cd kafka_2.13-3.5.0

Agora você pode listar o conteúdo desse diretório usando o comando ls.

Para usuários do Windows, você pode seguir as mesmas etapas. Se você não conseguir encontrar o comando tar, poderá usar uma ferramenta de terceiros como o WinZip para abrir o arquivo.

Como iniciar o Apache Kafka em sua máquina local

Depois de baixar e extrair o Apache Kafka, é hora de começar a executá-lo. Não possui instaladores. Você pode começar a usá-lo diretamente por meio da linha de comando ou da janela do terminal.

Antes de começar com o Apache Kafka, certifique-se de ter o Java 8+ instalado em seu sistema. O Apache Kafka requer uma instalação Java em execução.

#1. Execute o servidor Apache Zookeeper

A primeira etapa é executar o Apache Zookeeper. Você o obtém pré-baixado como parte do arquivo. É um serviço responsável por manter as configurações e fornecer sincronização para outros serviços.

Uma vez dentro do diretório onde você extraiu o conteúdo do arquivo, execute o seguinte comando:

Para usuários Linux:

bin/zookeeper-server-start.sh config/zookeeper.properties

Para usuários do Windows:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

O arquivo zookeeper.properties fornece as configurações para executar o servidor Apache Zookeeper. Você pode configurar propriedades como o diretório local onde os dados serão armazenados e a porta na qual o servidor será executado.

#2. Inicie o servidor Apache Kafka

Agora que o servidor Apache Zookeeper foi iniciado, é hora de iniciar o servidor Apache Kafka.

Abra um novo terminal ou janela de prompt de comando e navegue até o diretório onde os arquivos extraídos estão presentes. Então você pode iniciar o servidor Apache Kafka usando o comando abaixo:

  Como compartilhar seu Google Agenda

Para usuários Linux:

bin/kafka-server-start.sh config/server.properties

Para usuários do Windows:

bin/windows/kafka-server-start.bat config/server.properties

Você tem seu servidor Apache Kafka em execução. Caso deseje alterar a configuração padrão, poderá fazê-lo modificando o arquivo server.properties. Os diferentes valores estão presentes no documentação oficial.

Como usar o Apache Kafka em sua máquina local

Agora você está pronto para começar a usar o Apache Kafka em sua máquina local para produzir e consumir mensagens. Como os servidores Apache Zookeeper e Apache Kafka estão funcionando, vamos ver como você pode criar seu primeiro tópico, produzir sua primeira mensagem e consumir o mesmo.

Quais são as etapas para criar um tópico no Apache Kafka?

Antes de criar seu primeiro tópico, vamos entender o que realmente é um tópico. No Apache Kafka, um tópico é um armazenamento de dados lógicos que ajuda no streaming de dados. Pense nisso como o canal através do qual os dados são transportados de um componente para o outro.

Um tópico suporta multiprodutores e multiconsumidores – mais de um sistema pode escrever e ler de um tópico. Ao contrário de outros sistemas de mensagens, qualquer mensagem de um tópico pode ser consumida mais de uma vez. Além disso, você também pode mencionar o período de retenção de suas mensagens.

Vamos pegar o exemplo de um sistema (produtor) que produz dados para transações bancárias. E outro sistema (consumidor) consome esses dados e envia uma notificação do aplicativo para o usuário. Para facilitar isso, um tópico é necessário.

Abra um novo terminal ou janela de prompt de comando e navegue até o diretório onde você extraiu o arquivo. O seguinte comando criará um tópico chamado transações:

Para usuários Linux:

bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

Para usuários do Windows:

bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Agora você criou seu primeiro tópico e está pronto para começar a produzir e consumir mensagens.

Como produzir uma mensagem para o Apache Kafka?

Com seu tópico do Apache Kafka pronto, agora você pode produzir sua primeira mensagem. Abra um novo terminal ou janela de prompt de comando ou use a mesma que você usou para criar o tópico. Em seguida, verifique se você está no diretório adequado onde extraiu o conteúdo do arquivo. Você pode usar a linha de comando para produzir sua mensagem no tópico usando o seguinte comando:

Para usuários Linux:

bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

Para usuários do Windows:

bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

Depois de executar o comando, você verá que a janela do terminal ou do prompt de comando está aguardando entrada. Escreva sua primeira mensagem e pressione Enter.

> This is a transactional record for $100

Você produziu sua primeira mensagem para o Apache Kafka em sua máquina local. Posteriormente, agora você está pronto para consumir esta mensagem.

Como consumir uma mensagem do Apache Kafka?

Desde que seu tópico tenha sido criado e você tenha produzido uma mensagem para seu tópico Kafka, agora você pode consumir essa mensagem.

O Apache Kafka permite anexar vários consumidores ao mesmo tópico. Cada consumidor pode fazer parte de um grupo de consumidores – um identificador lógico. Por exemplo, se você tiver dois serviços que precisam consumir os mesmos dados, eles podem ter grupos de consumidores diferentes.

No entanto, se você tiver duas instâncias do mesmo serviço, convém evitar consumir e processar a mesma mensagem duas vezes. Nesse caso, ambos terão o mesmo grupo de consumidores.

No terminal ou na janela do prompt de comando, verifique se você está no diretório apropriado. Use o seguinte comando para iniciar o consumidor:

Para usuários Linux:

bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Para usuários do Windows:

bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Você verá a mensagem que produziu anteriormente aparecer em seu terminal. Agora você usou o Apache Kafka para consumir sua primeira mensagem.

  Os pássaros estão de volta e carregados com novas habilidades [Review]

O comando kafka-console-consumer recebe muitos argumentos passados. Vamos ver o que cada um deles significa:

  • O –topic menciona o tópico de onde você estará consumindo
  • –from-beginning diz ao consumidor do console para começar a ler as mensagens desde a primeira mensagem presente
  • Seu servidor Apache Kafka é mencionado por meio da opção –bootstrap-server
  • Além disso, você pode mencionar o grupo de consumidores passando o parâmetro –group
  • Na ausência de um parâmetro de grupo de consumidores, ele é gerado automaticamente

Com o consumidor do console em execução, você pode tentar produzir novas mensagens. Você verá que todos eles são consumidos e aparecem em seu terminal.

Agora que você criou seu tópico e produziu e consumiu mensagens com sucesso, vamos integrá-lo a um aplicativo Java.

Como criar produtor e consumidor Apache Kafka usando Java

Antes de começar, certifique-se de ter o Java 8+ instalado em sua máquina local. O Apache Kafka fornece sua própria biblioteca cliente que permite que você se conecte perfeitamente. Se você estiver usando o Maven para gerenciar suas dependências, adicione a seguinte dependência ao seu pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Você também pode baixar a biblioteca do Repositório Maven e adicione-o ao seu caminho de classe Java.

Depois que sua biblioteca estiver pronta, abra um editor de código de sua escolha. Vamos ver como você pode iniciar seu produtor e consumidor usando Java.

Criar produtor Apache Kafka Java

Com a biblioteca kafka-clients instalada, agora você está pronto para começar a criar seu produtor Kafka.

Vamos criar uma classe chamada SimpleProducer.java. Este será responsável por produzir mensagens sobre o tema que você criou anteriormente. Dentro dessa classe, você criará uma instância de org.apache.kafka.clients.producer.KafkaProducer. Posteriormente, você usará este produtor para enviar suas mensagens.

Para criar o produtor Kafka, você precisa do host e da porta do seu servidor Apache Kafka. Como você o está executando em sua máquina local, o host será localhost. Dado que você não alterou as propriedades padrão ao inicializar o servidor, a porta será 9092. Considere o seguinte código abaixo que o ajudará a criar seu produtor:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

Você notará que há três propriedades sendo definidas. Vamos percorrer rapidamente cada um deles:

  • O BOOTSTRAP_SERVERS_CONFIG permite definir onde o servidor Apache Kafka está sendo executado
  • O KEY_SERIALIZER_CLASS_CONFIG informa ao produtor qual formato usar para enviar as chaves de mensagem.
  • O formato para enviar a mensagem real é definido usando a propriedade VALUE_SERIALIZER_CLASS_CONFIG.

Como você enviará mensagens de texto, ambas as propriedades são definidas para usar StringSerializer.class.

Para realmente enviar uma mensagem para o seu tópico, você precisa usar o método Producer.send() que recebe um ProducerRecord. O código a seguir fornece um método que enviará uma mensagem para o tópico e imprimirá a resposta junto com o deslocamento da mensagem.

public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

Com todo o código pronto, agora você pode enviar mensagens para o seu tópico. Você pode usar um método main para testar isso, conforme apresentado no código abaixo:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

Neste código, você está criando um SimpleProducer que se conecta ao seu servidor Apache Kafka em sua máquina local. Ele usa internamente o KafkaProducer para produzir mensagens de texto sobre o seu tópico.

  Conheça esses jargões antes de fazer qualquer coisa criptográfica

Criar consumidor Apache Kafka Java

É hora de criar um consumidor Apache Kafka usando o cliente Java. Crie uma classe chamada SimpleConsumer.java. Em seguida, você criará um construtor para essa classe, que inicializa o org.apache.kafka.clients.consumer.KafkaConsumer. Para criar o consumidor, você precisa do host e da porta onde o servidor Apache Kafka é executado. Além disso, você precisa do Grupo de consumidores e do tópico do qual deseja consumir. Use o trecho de código fornecido abaixo:

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

Semelhante ao Kafka Producer, o Kafka Consumer também recebe um objeto Properties. Vejamos todos os diferentes conjuntos de propriedades:

  • BOOTSTRAP_SERVERS_CONFIG informa ao consumidor onde o servidor Apache Kafka está sendo executado
  • O grupo de consumidores é mencionado usando o GROUP_ID_CONFIG
  • Quando o consumidor começa a consumir, o AUTO_OFFSET_RESET_CONFIG permite que você mencione a quanto tempo deseja começar a consumir mensagens de
  • KEY_DESERIALIZER_CLASS_CONFIG informa ao consumidor o tipo da chave de mensagem
  • VALUE_DESERIALIZER_CLASS_CONFIG informa o tipo de consumidor da mensagem real

Como, no seu caso, você consumirá mensagens de texto, as propriedades do desserializador são definidas como StringDeserializer.class.

Agora você estará consumindo as mensagens do seu tópico. Para manter as coisas simples, assim que a mensagem for consumida, você imprimirá a mensagem no console. Vamos ver como você pode conseguir isso usando o código abaixo:

private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

Este código continuará pesquisando o tópico. Ao receber qualquer Cadastro de Consumidor, a mensagem será impressa. Teste seu consumidor em ação usando um método principal. Você iniciará um aplicativo Java que continuará consumindo o tópico e imprimindo as mensagens. Pare o aplicativo Java para encerrar o consumidor.

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

Ao executar o código, você observará que ele consome não apenas a mensagem produzida pelo seu produtor Java, mas também as que você produziu por meio do Console Producer. Isso ocorre porque a propriedade AUTO_OFFSET_RESET_CONFIG foi definida como mais cedo.

Com o SimpleConsumer em execução, você pode usar o produtor do console ou o aplicativo SimpleProducer Java para produzir mensagens adicionais para o tópico. Você os verá sendo consumidos e impressos no console.

Atenda a todas as suas necessidades de pipeline de dados com o Apache Kafka

O Apache Kafka permite que você lide com todos os seus requisitos de pipeline de dados com facilidade. Com a configuração do Apache Kafka em sua máquina local, você pode explorar todos os diferentes recursos que o Kafka oferece. Além disso, o cliente Java oficial permite que você escreva, conecte e se comunique com seu servidor Apache Kafka de forma eficiente.

Sendo um sistema de streaming de dados versátil, escalável e de alto desempenho, o Apache Kafka pode realmente ser uma virada de jogo para você. Você pode usá-lo para o seu desenvolvimento local ou até mesmo integrá-lo aos seus sistemas de produção. Assim como é fácil configurar localmente, configurar o Apache Kafka para aplicativos maiores não é uma tarefa difícil.

Se você estiver procurando por plataformas de streaming de dados, poderá consultar as melhores plataformas de dados de streaming para análise e processamento em tempo real.