Am un cluster Kafka care rulează pe un cluster Kubernetes de integrare cu 3 brokeri Kafka și 3 Zookeepers, fiecare componentă rulând într-un set de state.
Diagrama de conducere pe care o folosesc pentru a implementa clusterul este una personalizată, deoarece aveam nevoie de acces extern la cluster printr-un BigIp F5 și nu am găsit o diagramă de conducere care să facă acest lucru.
Imaginea lui Kafka este confluentinc/cp-kafka:5.4.0 iar cel de gardian este confluentinc/cp-zookeeper:5.4.0
/var/lib/zookeeper/data
și /var/lib/zookeeper/log
pentru Zookeeper sunt mapate la volume persistente.
La fel pentru /var/lib/kafka
pe Kafka
Folosesc hlebalbau/kafka-manager:stable pentru a urmări starea clusterului.
Am configurat trei partiții pe subiect, iar factorul de replicare este, de asemenea, egal cu trei.
Recent mi-am dat seama că dacă am repornit toți cei trei brokeri kafka în același timp (cu kubectl șterge pod
) tot conținutul subiectelor a fost pierdut:
- dimensiunea jurnalului scade la zero pentru fiecare subiect
- lista de subiecte rămâne aceeași
- lista consumatorilor rămâne aceeași, dar fiecare decalaj curent al consumatorului scade la o valoare negativă (dacă consumatorul a fost la offset 10000 pentru un subiect cu 10000 de mesaje, atunci dimensiunea subiectului scade la zero și consumatorul offset la -10000)
Nu am întâmpinat niciodată nicio problemă la repornirea unui broker kafka la un moment dat și așteptând ca acesta să fie pornit înainte de a reporni altul.
Știu că un cluster kafka nu este menit să fie oprit sau repornit astfel. Dar nu mă așteptam la un astfel de comportament.
Este un comportament așteptat? Sau am omis ceva evident?
Iată șablonul meu Yaml pentru un broker kafka. După cum indică numele său, the așteaptă-păsător de grădină zoologică.sh
scriptul „doar” așteaptă să înceapă paznicii zoo.
---
apiVersion: apps/v1
fel: StatefulSet
metadate:
nume: kafka-controller-1
specificație:
replici: 1
selector:
matchLabels:
aplicație: kafka-1
serviceName: kafka1-headless
șablon:
metadate:
etichete:
aplicație: kafka-1
cluster: kafka
specificație:
initContainers:
- nume: init-wait-zookeeper
imagine: bash:latest
comandă: ["/usr/local/bin/bash","-c","cp /wait-zookeeper-configmap/wait-zookeeper.sh /wait-zookeeper-emptydir/ && chmod 755 /wait-zookeeper-emptydir/ wait-zookeeper.sh && /wait-zookeeper-emptydir/wait-zookeeper.sh"]
volumMonturi:
- nume: wait-zookeeper-configmap
mountPath: /wait-zookeeper-configmap
- nume: wait-zookeeper-emptydir
mountPath: /wait-zookeeper-emptydir
afinitate:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- greutate: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- cheie: app
operator: In
valori:
- kafka-2
- kafka-3
{{- if gt .Values.workerNodesNumber 5.0 }}
- îngrijitor de grădină zoologică-1
- îngrijitor de grădină zoologică-2
- îngrijitor de grădină zoologică-3
{{- Sfârşit }}
topologyKey: „kubernetes.io/hostname”
containere:
- nume: kafka1
imagine: confluentinc/cp-kafka:5.4.0
resurse:
cereri:
memorie: "512Mi"
limite:
memorie: „{{.Values.jvmMaxHeapSizeGb}}Gi”
porturi:
- containerPort: 9092
env:
- nume: HOST_IP
valoareDe la:
fieldRef:
fieldPath: status.hostIP
- nume: KAFKA_LISTENERS
valoare: „PLAINTEXT://0.0.0.0:9092”
- nume: KAFKA_ADVERTISED_LISTENERS
valoare: „PLAINTEXT://$(HOST_IP):{{ adăugați .Values.startingNodePort 0 }}”
- nume: KAFKA_BROKER_ID
valoare: "10"
- nume: KAFKA_ZOOKEEPER_CONNECT
valoare: „zookeeper-controller-1-0.zoo1-headless:2181, zookeeper-controller-2-0.zoo2-headless:2181, zookeeper-controller-3-0.zoo3-headless:2181”
- nume: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
valoare: "3"
- nume: KAFKA_DELETE_TOPIC_ENABLE
valoare: „adevărat”
- nume: KAFKA_DEFAULT_REPLICATION_FACTOR
valoare: "3"
- nume: KAFKA_NUM_PARTITIONS
valoare: „{{.Values.defaultPartitionsNumber}}”
- nume: KAFKA_LOG_RETENTION_HOURS
valoare: „{{.Values.retentionTimeHours}}”
- nume: KAFKA_OFFSETS_RETENTION_MINUTES
valoare: „{{.Values.retentionTimeHours | mul 60 }}”
- nume: JMX_PORT
valoare: „{{ add .Values.startingNodePort 3 }}”
- nume: KAFKA_JMX_HOSTNAME
valoare: "kafka-service-1"
- nume: KAFKA_HEAP_OPTS
valoare: „-Xms512m -Xmx{{.Values.jvmMaxHeapSizeGb}}G”
livenessProbe:
executiv:
comanda:
- /bin/bash
- -c
- „unset JMX_PORT && kafka-broker-api-versions --bootstrap-server=localhost:9092”
initialDelaySeconds: 60
perioadaSecunde: 20
volumMonturi:
- nume: "kafka-logs"
mountPath: „/var/lib/kafka”
volume:
- nume: „wait-zookeeper-configmap”
configMap:
nume: "kafka-initializator"
articole:
- cheie: „wait-zookeeper.sh”
cale: „wait-zookeeper.sh”
- nume: „wait-zookeeper-emptydir”
emptyDir: {}
volumeClaimTemplates:
- metadate:
nume: kafka-busteni
specificație:
storageClassName: {{.Values.storageClassName}}
AccessModes: ["ReadWriteOnce"]
resurse:
cereri:
stocare: {{.Values.storageSizeGb}}Gi