En steg-för-steg-guide för att installera och köra

Miljontals dataposter genereras varje dag i dagens datorsystem. Dessa inkluderar dina finansiella transaktioner, beställning eller data från din bilsensor. För att bearbeta dessa dataströmningshändelser i realtid och för att på ett tillförlitligt sätt flytta händelseposter mellan olika företagssystem behöver du Apache Kafka.

Apache Kafka är en dataströmningslösning med öppen källkod som hanterar över 1 miljon poster per sekund. Utöver denna höga genomströmning ger Apache Kafka hög skalbarhet och tillgänglighet, låg latens och permanent lagring.

Företag som LinkedIn, Uber och Netflix förlitar sig på Apache Kafka för realtidsbearbetning och dataströmning. Det enklaste sättet att komma igång med Apache Kafka är att ha det igång på din lokala dator. Detta låter dig inte bara se Apache Kafka-servern i aktion utan låter dig också producera och konsumera meddelanden.

Med praktisk erfarenhet av att starta servern, skapa ämnen och skriva Java-kod med Kafka-klienten, är du redo att använda Apache Kafka för att uppfylla alla dina datapipelinebehov.

Hur man laddar ner Apache Kafka på din lokala dator

Du kan ladda ner den senaste versionen av Apache Kafka från officiell länk. Det nedladdade innehållet kommer att komprimeras i .tgz-format. När du har laddat ner måste du extrahera densamma.

Om du är Linux, öppna din terminal. Navigera sedan till platsen där du har laddat ner den komprimerade Apache Kafka-versionen. Kör följande kommando:

tar -xzvf kafka_2.13-3.5.0.tgz

När kommandot är klart, kommer du att upptäcka att en ny katalog som heter kafka_2.13-3.5.0. Navigera in i mappen med:

cd kafka_2.13-3.5.0

Du kan nu lista innehållet i den här katalogen med kommandot ls.

För Windows-användare kan du följa samma steg. Om du inte kan hitta kommandot tar kan du använda ett tredjepartsverktyg som WinZip för att öppna arkivet.

Hur man startar Apache Kafka på din lokala maskin

När du har laddat ner och extraherat Apache Kafka är det dags att börja köra det. Den har inga installatörer. Du kan börja använda det direkt via din kommandorad eller terminalfönster.

Innan du börjar med Apache Kafka, se till att du har Java 8+ installerat på ditt system. Apache Kafka kräver en Java-installation som körs.

#1. Kör Apache Zookeeper-servern

Det första steget är att köra Apache Zookeeper. Du får den förnedladdad som en del av arkivet. Det är en tjänst som ansvarar för att underhålla konfigurationer och tillhandahålla synkronisering för andra tjänster.

När du är inne i katalogen där du har extraherat innehållet i arkivet, kör följande kommando:

För Linux-användare:

bin/zookeeper-server-start.sh config/zookeeper.properties

För Windows-användare:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Filen zookeeper.properties tillhandahåller konfigurationerna för att köra Apache Zookeeper-servern. Du kan konfigurera egenskaper som den lokala katalogen där data kommer att lagras och porten som servern kommer att köras på.

#2. Starta Apache Kafka-servern

Nu när Apache Zookeeper-servern har startats är det dags att starta Apache Kafka-servern.

Öppna ett nytt terminal- eller kommandotolksfönster och navigera till katalogen där de extraherade filerna finns. Sedan kan du starta Apache Kafka-servern med kommandot nedan:

För Linux-användare:

bin/kafka-server-start.sh config/server.properties

För Windows-användare:

bin/windows/kafka-server-start.bat config/server.properties

Du har din Apache Kafka-server igång. Om du vill ändra standardkonfigurationen kan du göra det genom att ändra filen server.properties. De olika värdena finns i officiell dokumentation.

Hur man använder Apache Kafka på din lokala maskin

Du är nu redo att börja använda Apache Kafka på din lokala dator för att producera och konsumera meddelanden. Eftersom Apache Zookeeper och Apache Kafka-servrarna är igång, låt oss se hur du kan skapa ditt första ämne, producera ditt första meddelande och konsumera detsamma.

Vilka är stegen för att skapa ett ämne i Apache Kafka?

Innan du skapar ditt första ämne, låt oss förstå vad ett ämne faktiskt är. I Apache Kafka är ett ämne ett logiskt datalager som hjälper till med dataströmning. Se det som kanalen genom vilken data transporteras från en komponent till en annan.

Ett ämne stöder flera producenter och multikonsumenter – mer än ett system kan skriva och läsa från ett ämne. Till skillnad från andra meddelandesystem kan alla meddelanden från ett ämne konsumeras mer än en gång. Dessutom kan du också nämna lagringsperioden för dina meddelanden.

Låt oss ta exemplet med ett system (producent) som producerar data för banktransaktioner. Och ett annat system (konsument) förbrukar denna data och skickar ett appmeddelande till användaren. För att underlätta detta krävs ett ämne.

Öppna ett nytt terminal- eller kommandotolksfönster och navigera till katalogen där du har extraherat arkivet. Följande kommando kommer att skapa ett ämne som heter transaktioner:

För Linux-användare:

bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

För Windows-användare:

bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Du har nu skapat ditt första ämne och du är redo att börja producera och konsumera meddelanden.

Hur producerar man ett meddelande till Apache Kafka?

Med ditt Apache Kafka-ämne redo kan du nu skapa ditt första meddelande. Öppna ett nytt terminal- eller kommandotolksfönster, eller använd samma som du har använt för att skapa ämnet. Se sedan till att du är i rätt katalog där du har extraherat innehållet i arkivet. Du kan använda kommandoraden för att skapa ditt meddelande om ämnet med följande kommando:

För Linux-användare:

bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

För Windows-användare:

bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

När du kör kommandot ser du att din terminal eller kommandotolksfönstret väntar på inmatning. Skriv ditt första meddelande och tryck på Enter.

> This is a transactional record for $100

Du har producerat ditt första meddelande till Apache Kafka på din lokala dator. Därefter är du nu redo att använda detta meddelande.

Hur konsumerar man ett meddelande från Apache Kafka?

Förutsatt att ditt ämne har skapats och du har skapat ett meddelande till ditt Kafka-ämne, kan du nu konsumera det meddelandet.

Apache Kafka låter dig koppla flera konsumenter till samma ämne. Varje konsument kan ingå i en konsumentgrupp – en logisk identifierare. Om du till exempel har två tjänster som behöver konsumera samma data, så kan de ha olika konsumentgrupper.

Men om du har två instanser av samma tjänst vill du undvika att konsumera och bearbeta samma meddelande två gånger. I så fall kommer båda att ha samma konsumentgrupp.

Se till att du är i rätt katalog i terminal- eller kommandotolksfönstret. Använd följande kommando för att starta konsumenten:

För Linux-användare:

bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

För Windows-användare:

bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Du kommer att se meddelandet som du tidigare har skapat visas på din terminal. Du har nu använt Apache Kafka för att konsumera ditt första meddelande.

Kommandot kafka-console-consumer tar många argument som skickas in. Låt oss se vad var och en av dem betyder:

  • –ämnet nämner ämnet där du kommer att konsumera
  • –from-beginning säger åt konsolkonsumenten att börja läsa meddelanden direkt från det första meddelandet
  • Din Apache Kafka-server nämns via alternativet –bootstrap-server
  • Dessutom kan du nämna konsumentgruppen genom att skicka parametern –group
  • I avsaknad av en konsumentgruppsparameter genereras den automatiskt

Med konsolkonsumenten igång kan du prova att skapa nya meddelanden. Du kommer att se att alla är förbrukade och dyker upp i din terminal.

Nu när du har skapat ditt ämne och framgångsrikt producerat och konsumerat meddelanden, låt oss integrera detta med en Java-applikation.

Hur man skapar Apache Kafka producent och konsument med Java

Innan du börjar, se till att du har Java 8+ installerat på din lokala dator. Apache Kafka tillhandahåller sitt eget klientbibliotek som låter dig ansluta sömlöst. Om du använder Maven för att hantera dina beroenden, lägg sedan till följande beroende till din pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Du kan också ladda ner biblioteket från Maven-förvaret och lägg till den i din Java-klassväg.

När ditt bibliotek är på plats öppnar du en valfri kodredigerare. Låt oss se hur du kan starta upp din producent och konsument med Java.

Skapa Apache Kafka Java-producent

Med kafka-klientbiblioteket på plats är du nu redo att börja skapa din Kafka-producent.

Låt oss skapa en klass som heter SimpleProducer.java. Detta kommer att vara ansvarigt för att producera meddelanden om ämnet som du har skapat tidigare. Inuti den här klassen kommer du att skapa en instans av org.apache.kafka.clients.producer.KafkaProducer. Därefter kommer du att använda den här producenten för att skicka dina meddelanden.

För att skapa Kafka-producenten behöver du värden och porten för din Apache Kafka-server. Eftersom du kör det på din lokala dator kommer värden att vara localhost. Med tanke på att du inte har ändrat standardegenskaperna när du startar servern kommer porten att vara 9092. Tänk på följande kod nedan som hjälper dig att skapa din producent:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

Du kommer att märka att det finns tre egenskaper som ställs in. Låt oss snabbt gå igenom var och en av dem:

  • BOOTSTRAP_SERVERS_CONFIG låter dig definiera var Apache Kafka-servern körs
  • KEY_SERIALIZER_CLASS_CONFIG talar om för producenten vilket format som ska användas för att skicka meddelandenycklarna.
  • Formatet för att skicka det faktiska meddelandet definieras med egenskapen VALUE_SERIALIZER_CLASS_CONFIG.

Eftersom du kommer att skicka textmeddelanden är båda egenskaperna inställda på att använda StringSerializer.class.

För att faktiskt skicka ett meddelande till ditt ämne måste du använda metoden producer.send() som tar in ett ProducerRecord. Följande kod ger dig en metod som skickar ett meddelande till ämnet och skriver ut svaret tillsammans med meddelandeförskjutningen.

public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

Med hela koden på plats kan du nu skicka meddelanden till ditt ämne. Du kan använda en huvudmetod för att testa detta, som presenteras i koden nedan:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

I den här koden skapar du en SimpleProducer som ansluter till din Apache Kafka-server på din lokala dator. Den använder internt KafkaProducer för att producera textmeddelanden om ditt ämne.

Skapa Apache Kafka Java-konsument

Det är dags att göra en Apache Kafka-konsument med hjälp av Java-klienten. Skapa en klass som heter SimpleConsumer.java. Därefter skapar du en konstruktor för den här klassen, som initierar org.apache.kafka.clients.consumer.KafkaConsumer. För att skapa konsumenten behöver du värden och porten där Apache Kafka-servern körs. Dessutom behöver du konsumentgruppen samt ämnet du vill konsumera från. Använd kodavsnittet nedan:

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

I likhet med Kafka-producenten tar Kafka-konsumenten även in ett objekt av typen Egenskaper. Låt oss titta på alla de olika egenskaperna:

  • BOOTSTRAP_SERVERS_CONFIG talar om för konsumenten var Apache Kafka-servern körs
  • Konsumentgruppen nämns med hjälp av GROUP_ID_CONFIG
  • När konsumenten börjar konsumera låter AUTO_OFFSET_RESET_CONFIG dig nämna hur långt tillbaka du vill börja konsumera meddelanden från
  • KEY_DESERIALIZER_CLASS_CONFIG berättar för konsumenten vilken typ av meddelandenyckel
  • VALUE_DESERIALIZER_CLASS_CONFIG talar om för konsumenttypen för det faktiska meddelandet

Eftersom du i ditt fall kommer att konsumera textmeddelanden är egenskaperna för deserializer inställda på StringDeserializer.class.

Du kommer nu att konsumera meddelanden från ditt ämne. För att göra saker enkelt, när meddelandet är förbrukat, kommer du att skriva ut meddelandet till konsolen. Låt oss se hur du kan uppnå detta med koden nedan:

private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

Den här koden kommer att fortsätta att fråga om ämnet. När du får någon konsumentpost kommer meddelandet att skrivas ut. Testa din konsument i aktion med hjälp av en huvudmetod. Du startar en Java-applikation som kommer att fortsätta att konsumera ämnet och skriva ut meddelandena. Stoppa Java-applikationen för att avsluta konsumenten.

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

När du kör koden ser du att den inte bara förbrukar meddelandet som produceras av din Java-producent utan även de som du har producerat via Console Producer. Detta beror på att egenskapen AUTO_OFFSET_RESET_CONFIG har ställts in på tidigast.

Med SimpleConsumer igång kan du använda konsolproducenten eller SimpleProducer Java-applikationen för att skapa ytterligare meddelanden till ämnet. Du kommer att se dem konsumeras och skrivas ut på konsolen.

Tillgodose alla dina datapipelinebehov med Apache Kafka

Apache Kafka låter dig hantera alla dina datapipelinekrav med lätthet. Med Apache Kafka-installationen på din lokala dator kan du utforska alla de olika funktioner som Kafka tillhandahåller. Dessutom låter den officiella Java-klienten dig skriva, ansluta och kommunicera effektivt med din Apache Kafka-server.

Eftersom det är ett mångsidigt, skalbart och högpresterande dataströmningssystem kan Apache Kafka verkligen vara en spelväxlare för dig. Du kan använda den för din lokala utveckling eller till och med integrera den i dina produktionssystem. Precis som det är lätt att ställa in lokalt är det ingen stor uppgift att ställa in Apache Kafka för större applikationer.

Om du letar efter dataströmningsplattformar kan du titta på de bästa streamingdataplattformarna för realtidsanalys och bearbetning.