Apache Kafka

Lesezeit: 13 Minuten

Apache Kafka ist ein verteiltes System zur Verarbeitung von Datenströmen, das auf hohen Datendurchsatz, geringe Latenzen und Echtzeitverarbeitung optimiert ist. Dafür setzt es auf ein TCP-basiertes binäres und auf Effizienz optimiertes Protokoll, das Nachrichten gruppiert, um den Datenoverhead zu minimieren.

Apache Kafka wurde ursprünglich bei LinkedIn entwickelt und 2011 an die Apache Software Foundation übergeben und quelloffen gemacht. Es ist in Java und Scala geschrieben und nach dem bekannten Autor Franz Kafka benannt.

I thought that since Kafka was a system optimized for writing using a writer’s name would make sense. I had taken a lot of lit classes in college and liked Franz Kafka. Plus the name sounded cool for an open source project.

Jay Kreps

Grundlagen

Kafka ist in ein Publish-Subscribe Messaging System. Nachrichten, die mit anderen Systemen geteilt werden sollen, werden in Kafka publiziert. Interessierte Clients abonnieren einen Strom von Nachrichten und können diese unabhängig voneinander nahezu in Echtzeit verarbeiten. Dienste, die Nachrichten in Kafka veröffentlichen, werden Producer genannt. Dienste, die Nachrichten in Kafka abonnieren, werden Consumer genannt. Ein Dienst wie eine Spring-Boot-Anwendung kann gleichzeitig Producer und Consumer sein.

Nachrichten

Eine Nachricht kann einem primitiven Wert wie einem String in Java entsprechen, oder ein serialisiertes Objekt beinhalten. Üblicherweise werden Java-Objekte als JSON-Strukturen in Kafka-Nachrichten abgebildet.

Eine Nachricht besteht aus einem Key und einem Value. Der eigentliche Inhalt der Nachricht steht im Value, während der Key verwendet werden kann, um eine Nachricht eindeutig zu identifizieren.

Topic

Ein Topic ist ein Strom von Nachrichten. Eine Nachricht wird in einem Topic abgelegt. Damit wird eine logische Trennung ermöglicht. Ein Topic in Kafka kann vereinfacht gesagt wie eine Tabelle in einer relationalen Datenbank verstanden werden.

Ein Topic hat einen Besitzer, der Nachrichten in demselbigen veröffentlicht, und viele Abonnenten, die an diesen Nachrichten interessiert sind und sie verarbeiten. Es ist auch möglich, meines Erachtens aber eher unüblich, dass mehrere Dienste Daten in dasselbe Topic schreiben.

Da Nachrichten in der Regel zwischen verschiedenen Services und Teams ausgetauscht werden, sollten sie als öffentliche Schnittstelle (API) verstanden werden. Die Struktur einer Nachricht muss Sender und Empfängern bekannt sein. Änderungen an der Struktur müssen kommuniziert und mit Bedacht umgesetzt werden. Für einen Übergangszeitraum sollten Empfänger dabei in der Lage sein, sowohl das alte als auch neue Format zu verstehen. Es ist möglich, in einem Topic Nachrichten verschiedener Strukturen zu verwalten. Sowohl im Producer als auch Consumer werden Serialisierer, Deserialisierer und ein Type Mapping konfiguriert, um die Übersetzung einer Nachricht in die Instanz einer Klasse und andersherum zu bewerkstelligen.

Es empfiehlt sich, für Kafka Topics eine Namenskonvention zu definieren. Ein Topic-Name wie „bestellung“ dürfte in den meisten Fällen unpassend sein. Die Konvention und die Anzahl der Informationen in einem Topic-Namen hängen auch davon ab, wie weitläufig eine Kafka-Instanz im Einsatz ist. Wird ein Cluster für eine gesamte Organisation verwendet, sollten die Namen wesentlich spezifischer sein, als wenn der Cluster nur einer Handvoll Teams dient. Folgende Informationen könnten beispielsweise Bestandteil eines Topic-Namens sein:

  • der Name der Umgebung, wenn z.B. ein gemeinsames Cluster für eine Entwicklungs- und eine Testumgebung verwendet wird
  • der Name einer Organisationseinheit, z.B. resort-bereich-abteilung
  • der Name eines Produktes oder Teams
  • der Name einer Entität. z.B. „bestellung“ oder „artikel“
  • der Name eines Ereignisses, z.B. „deleted“ oder „created“

Broker

Die Serverkomponente von Kafka, mit der sich Producer und Consumer verbinden, nennt sich Kafka Broker. Mehrere Kafka Broker formen im Verbund einen Kafka Cluster.

Partition

Jedes Topic hat mindestens eine Partition. Die Anzahl der Partitionen wird bei Anlage des Topics angegeben. In der Regel ist es eine gute Idee, mehrere Partitionen für ein Topic zu verwenden. Die Schreiboperationen in die Partitionen werden auf die Kafka Broker verteilt. Ein Broker kann sowohl für eine Partition als auch für mehrere Partitionen eines Topics als Partition Leader fungieren. Fällt ein Broker in einer Leader-Rolle aus, übernimmt ein anderer Broker diese Aufgabe.

Neue Nachrichten werden am Ende einer Partition in ein Topic eingefügt. Innerhalb einer Partition wird eine Nachricht dabei eindeutig durch ein Offset identfiziert, welches in einer leeren Partition mit Null beginnt. Für die Zuweisung einer Nachricht an eine Partition wird eine Partitionsstrategie verwendet. Per Default bestimmt der Hashwert des Nachrichten-Keys, in welche Partition diese gelangt. Ist der Key null, erfolgt die Zuweisung nach dem Round-Robin-Verfahren.

Nachrichten haben innerhalb einer Partition eine eindeutige Reihenfolge, aber nicht über Partitionen hinweg. Partitionen fördern die horizontale Skalierbarkeit eines Kafka Clusters. In einem Cluster mit zehn Brokern können in ein Topic mit zehn Partitionen potenziell bis zu zehnmal so schnell Nachrichten publiziert werden wie in ein Topic mit einer Partition.

Replica

Bei der Anlage eines Topics wird ein Replication Factor spezifizert. Bei einem Wert von eins wird die Partition nicht repliziert, bei höheren Werten werden entsprechend viele Replicas angelegt. Replicas sind Kopien von Partitionen, die durchgehend aktuell gehalten werden. Replicas sind gleichmäßig auf die Broker verteilt. Üblicherweise sollte der Replication Factor die Anzahl der Broker insgesamt nicht überschreiten, da sich die Ausfallsicherheit dadurch nicht mehr weiter verbessert, aber zusätzlicher Platz beansprucht wird. Die Aufgabe der Replication übernehmen Broker, die nicht Leader für die jeweilige Partition sind und damit als Follower dienen.

Consumer Group

Damit mehrere Consumer sich neue Nachrichten in einem Topic nicht gegenseitig „weglesen“, ordnen sie sich Consumer Groups zu. Consumer Groups merken sich die Offsets in den Partitionen für einen oder mehrere zusammenarbeitende Consumer und stellen damit sicher, dass diese zum Beispiel nach einem Neustart nicht wieder alle Nachrichten erneut verarbeiten. Consumer Groups ermöglichen Consumern auch eine horizontale Skalierung. Zehn Instanzen eines Services können sich unter der gleichen Consumer Group mit Kafka verbinden, um neue Nachrichten parallel zu verarbeiten. Die Consumer werden dabei gleichmäßig auf die verschiedenen Partitionen und Replicas verteilt.

Fehlerbehandlung

Kafka kann sowohl für Producer als auch Consumer weitreichend konfiguriert werden. Es ist beispielsweise möglich, gelesene Nachrichten automatisch zu bestätigen oder eine manuelle Bestätigung zu erzwingen. Wird eine gelesene Nachricht nicht bestätigt, verändert sich das Partition Offset für die Consumer Group nicht und die Nachricht wird erneut verarbeitet.

Mit Kafka lässt sich auch das Dead Letter Pattern umsetzen. Über Spring Kafka kann via der Annotationen @RetryableTopic und @DltHandler eine Fehlerbehandlung für nicht verarbeitbare Nachrichten implementiert werden. Eine Nachricht wird dabei nach einer definierten Anzahl Verarbeitungsversuchen in ein automatisch erzeugtes Dead Letter Topic verschoben und kann von dort getrennt verarbeitet werden. Es ist zum Beispiel vorstellbar, dass die Deserialisierung durch eine nicht abgestimmte Formatänderung der Nachricht fehlschlägt. Der Dead Letter Handler kann diese Nachricht z.B. als String empfangen und inklusive des Keys im Log protokollieren, um das Team über eine nicht verarbeitbare Nachricht zu informieren.

Log Retention

Per Log Retention wird konfiguriert, wie lange Nachrichten für ein Topic bestehen bleiben. Solange eine Nachricht besteht, kann ein Consumer sie beliebig oft neu verarbeiten, indem die Partition Offsets auf vorherige Werte zurückgesetzt werden. Die Log Retention kann als Default im Kafka Broker konfiguriert und für ein bestimmtes Topic explizit überschrieben werden. Standardmäßig werden Nachrichten 168 Stunden, also sieben Tage, aufbewahrt.

Neben der Zeit als Kriterium kann auch eine maximale Größe für ein Topic festgelegt werden. Wird diese überschritten, so werden die ältesten Nachrichten automatisch gelöscht. Eine Konfiguration kann die Faktoren Zeit und Größe auch kombinieren, so dass Nachrichten für ein Topic beispielsweise maximal 14 Tage bis zu einer Gesamtgröße des Topics von 500 MB vorgehalten werden. Ist eine Nachricht älter als 14 Tage, wird sie gelöscht. Überschreitet das Topic den Speicherbedarf von 500 MB, so wird die älteste Nachricht gelöscht, bis das Topic den Speicherbedarf nicht mehr überschreitet.

Log Compaction

In einem Szenario, in dem über Kafka Stammdatenänderungen in Form von vollständigen Datensätzen publiziert werden, sind vorherige Stände der gleichen Entität in der Regel uninteressant, da das neueste Update alle relevanten Informationen beinhaltet. In diesem Fall kann Log Compaction für ein Topic aktiviert werden. Diese Einstellung sorgt dafür, dass ältere Versionen eines Datensatzes automatisch aufgeräumt werden. Damit dies funktioniert, muss ein Datensatz eindeutig über den Nachrichten-Key identifizierbar sein. Neue Informationen werden über den gleichen Key (innerhalb des Topics) publiziert, womit vorherige Nachrichten mit demselben Key zur Löschung freigegeben werden.

Soll ein Datensatz eines „compacted“ Topics komplett gelöscht werden, so kann eine Nachricht mit dem entsprechenden Key und einem Value von null publiziert werden. Eine solche Nachricht wird als Tombstone bezeichnet.

Log Compaction spart nicht nur Platz, sondern verbessert auch die Performance in Szenarien, wo häufiger Partition Offsets zurückgesetzt werden um alle Nachrichten des Topics erneut zu verarbeiten.

Weiterführende Themen

Folgend sind tabellarisch aufgelistet weitere Themen, die bei Einführung von Kafka von Interesse sein könnten, aber den Rahmen dieses Artikels sprengen würden:

ThemaBeschreibung
Kafka Connectein Framework zur Anbindung von Kafka an diverse Drittsysteme wie relationale Datenbanken, JMS, Elasticsearch, Amazon S3 oder Google BigQuery
Kafka Streamseine clientseitige Bibliothek für Kafka, die High-Level-Operationen wie map, filter, group, aggregations und joins bietet
Redpanda Console (ehemals Kowl)eine graphische Oberfläche für Kafka Cluster, in der z.B. Topics durchsucht und Offsets für Consumer Groups zurückgesetzt werden können
Remoraein quelloffenes Monitoring-Tool für Kafka, das Kennzahlen wie Consumer Lags (Anzahl unverarbeiteter Nachrichten) als Metriken zur Verfügung stellen und an Drittsysteme wie Datadog und Amazon CloudWatch weiterleiten kann
Skriptein der Kafka-Installation im bin-Verzeichnis sind diverse Skripte enthalten, über die Broker gestartet, Topics angelegt und Einstellungen verändert werden können – entsprechende Beispiel finden sich unter anderem in der Kafka-Dokumentation

Ein Beispiel mit Spring Kafka

Auf GitHub habe ich eine minimale Spring-Boot-Anwendung mit Kotlin gebaut, die Nachrichten an Kafka publiziert und konsumiert. Für diese Anwendung habe ich mehrere Testcontainers-Tests geschrieben, die diesen Vorgang demonstrieren und auch das Verhalten eines Dead Letter Handlers abbilden. In einem weiteren Test wird die zuverlässige Massenverarbeitung von Nachrichten abgetestet. Mit meinem knapp vier Jahre alten MacBook Pro werden im Test zehntausend Nachrichten binnen ca. 1,3 Sekunden publiziert und direkt wieder konsumiert.

Das Setup ist denkbar einfach. In der application.yml werden die in den Nachrichten verwendeten Klassen angegeben. Als einfache POJOs (Plain Old Java Objects) werden diese vom Standard-JsonSerializer/-Deserializer unterstützt. Die Adresse des einzelnen Brokers wird beim Hochfahren desselbigen im Testcontainers-Test ermittelt. Das Topic wird über den in Spring Kafka enthaltenen TopicBuilder als JavaBean angelegt.

Der Producer verwendet KafkaTemplate, um die Instanz einer von vier Klassen in das konfigurierte Topic zu schreiben. Der Vorgang ist asynchron. Über ein Callback können im Erfolgs- oder Fehlerfall weitere Schritte durchgeführt werden, im Beispiel wird im Fehlerfall der Exception-Stacktrace nach standardout geschrieben:

@Component
class ArithmeticOperationProducer(
  val kt: KafkaTemplate<String, ArithmeticOperation>,
  val topicConfiguration: KafkaTopicConfiguration,
) {

  enum class Mode {
    ADDITION,
    SUBTRACTION,
    MULTIPLICATION,
DIVISION
}

  fun produce(mode: Mode, op1: Int, op2: Int) {
    kt.send(
      topicConfiguration.topic,
      when(mode) {
        ADDITION -> Addition(op1, op2)
        SUBTRACTION -> Subtraction(op1, op2)
        MULTIPLICATION -> Multiplication(op1, op2)
        DIVISION -> Division(op1, op2)
      },
    ).whenComplete { _, ex -> ex.printStackTrace() }
  }
}

Der Consumer liest Nachrichten als Key-Value Paar in Form eines ConsumerRecords via Annotation @KafkaListener, in der das Topic und die Consumer Group angegeben sind. Die Annotationen @RetryableTopic am Kafka Listener und @DltHandler an der Fehlerbehandlungsfunktion setzen das Dead Letter Pattern um. In meinem einfachen Beispiel werden Rechenaufgaben via Kafka publiziert und durch die Consumer berechnet. Dabei kommt es regelmäßig zu einem Fehler, wenn der Divisor einer Division den Wert 0 hat (ArithmeticException).

@Component
class ArithmeticOperationConsumer(
  val resultStore: ArithmeticOperationResultStore,
  val errorStore: ArithmeticOperationErrorStore,
) {

  @RetryableTopic(
    attempts = "1",
    dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR
  )
  @KafkaListener(
    topics = ["#{'\${kafkademo.topic}'}"],
    groupId = "kafkademo"
  )
  fun consume(
    r: ConsumerRecord<Any, ArithmeticOperation>
  ) {
    with(r.value()) {
      try {
        resultStore.add(calculateResult())
      } catch (ex: ArithmeticException) {
        handleErroneousArithmeticOperation(this)
      }
    }
  }

  @DltHandler
  fun handleErroneousArithmeticOperation(
    arithmeticOperation: ArithmeticOperation
  ) { errorStore.add(arithmeticOperation) }
}

In meinem einfachen Beispiel werden erfolgreich gelöste Rechenaufgaben in einem ArithmeticOperationResultStore abgelegt, während Fehlschläge, d.h. Teilungen durch null, in einem ArithmeticOperationErrorStore landen. In den definierten Testfällen wird abgeprüft, dass nach Publizierung einer Rechenaufgabe das Ergebnis innerhalb einer tolerablen Zeit in einem der beiden Datenspeicher landet. Der vollständige Code dazu findet sich im nachfolgend verlinkten GitHub-Projekt.

- Minimalbeispiel Producer / Consumer mit Spring Kafka
- Einführung in das Kafka-Protocol