Am o conexiune dificilă la o mașină care rulează Kafka de la un client care rulează un script Faust. Scriptul arată astfel:
import faust
jurnal de import
din asyncio import sleep
Class Test(faust.Record):
mesaj: str
app = faust.App('myapp', broker='kafka://10.0.0.20:9092')
topic = app.topic('test', value_type=Test)
@app.agent(subiect)
async def salut (mesaje):
asincron pentru mesajul din mesaje:
print(f'Primit {message.msg}')
@app.timer(interval=5.0)
async def example_sender():
așteaptă salut.trimite(
valoare=Test(msg='Bună lume!'),
)
if __name__ == '__main__':
app.main()
Când rulez scriptul:
# faust -A myapp worker -l info
âÆaµSâ v0.8.1ââ¬âââââââââââââââââââââ âââââââââââââââ†âââââââââââââââ†âââââ
â id â myapp â
â transport â [URL('kafka://10.0.0.20:9092')] â
â stocare â memorie: â
â web â http://hubbabubba:6066 â
â jurnal â -stderr- (informații) â
â pid â 260765 â
â nume de gazdă â hubbabubba â
â platformă â CPython 3.8.10 (Linux x86_64) â
â drivere â â
â transport â aiokafka=0,7,2 â
â web â aiohttp=3.8.1 â
â datadir â /Git/faust-kafka/myapp-data â
â appdir â /Git/faust-kafka/myapp-data/v1 â
âââââââââââââââ´ââ âââââââââââââââ†âââââââââââââââ†âââââââââââââââ
[2022-01-28 13:09:57,018] [260765] [INFO] [^Lucrător]: Începe...
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: Începe...
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: Începe...
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producător]: Începe...
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: Începe...
[2022-01-28 13:09:57,024] [260765] [EROARE] Nu se poate conecta la „10.0.0.20:9092”: [Errno 113] Apel de conectare eșuat („10.0.0.20”, 9092)
[2022-01-28 13:09:57,025] [260765] [EROARE] [^Worker]: Eroare: KafkaConnectionError ("Nu se poate porni bootstrap de la [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>) ]")
Traceback (cel mai recent apel ultimul):
Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py”, linia 276, în execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
Fișierul „/usr/lib/python3.8/asyncio/base_events.py”, linia 616, în run_until_complete
returnează viitor.rezultat()
Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py”, linia 759, la început
așteaptă self._default_start()
Fișierul „/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py”, linia 766, în _default_start
aştept self._actually_start()...
Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py”, linia 249, în bootstrap
ridicați KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Imposibil de pornit de la [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^Lucrător]: Oprire...
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Se opresc...
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Spălați tamponul producătorului...
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: Se opresc...
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: Oprire...
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Dirijor]: Oprire...
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: Se opresc...
[2022-01-28 13:09:57,029] [260765] [INFO] [^Agent: myapp.hello]: Oprire...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: Oprire...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: Oprire...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumator]: Oprire...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Web]: Oprire...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: Se opresc...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producător]: Oprire...
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: Oprire...
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: Oprire...
[2022-01-28 13:09:57,032] [260765] [INFO] [^Lucrător]: Se adună sarcini de serviciu...
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Adunăm toate viitoarele...
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: Se închide bucla de eveniment
Kafka (v.2.8.1) rulează pe 10.0.0.20, portul 9092. Configurația Kafka arată astfel:
# Licențiat la Apache Software Foundation (ASF) sub unul sau mai multe
# acorduri de licență pentru colaboratori. Vedeți fișierul NOTICE distribuit cu
# această lucrare pentru informații suplimentare cu privire la dreptul de autor.
# ASF vă acordă licența acestui fișier în baza licenței Apache, versiunea 2.0
# („Licența”); nu puteți utiliza acest fișier decât în conformitate cu
# Licența. Puteți obține o copie a licenței la
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Cu excepția cazului în care este cerut de legea aplicabilă sau dacă este convenit în scris, software
# distribuit sub Licență este distribuit „CA AȘA ESTE”,
# FĂRĂ GARANȚII SAU CONDIȚII DE NICIUN FEL, fie exprese, fie implicite.
# Consultați Licența pentru permisiunile specifice limbii care guvernează și
# limitări conform Licenței.
# vezi kafka.server.KafkaConfig pentru detalii suplimentare și valori implicite
############################ Elementele de bază ale serverului ################### ##########
# Id-ul brokerului. Acesta trebuie setat la un număr întreg unic pentru fiecare broker.
broker.id=0
############################ Setări Socket Server ################## ###########
# Adresa pe care ascultă serverul de socket. Acesta va primi valoarea returnată de la
# java.net.InetAddress.getCanonicalHostName() dacă nu este configurat.
# FORMAT:
# ascultători = listener_name://host_name:port
# EXEMPLU:
# ascultători = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Numele de gazdă și portul pe care brokerul le va face publicitate producătorilor și consumatorilor. Dacă nu este setat,
# folosește valoarea pentru „ascultători” dacă este configurat. În caz contrar, va folosi valoarea
# returnat de la java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092
# Mapează numele ascultătorilor la protocoalele de securitate, implicit este ca acestea să fie aceleași. Consultați documentația de configurare pentru mai multe detalii
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# Numărul de fire de execuție pe care serverul le folosește pentru a primi cereri din rețea și a trimite răspunsuri către rețea
num.network.threads=3
# Numărul de fire de execuție pe care serverul le folosește pentru procesarea cererilor, care pot include I/O pe disc
num.io.threads=8
# Bufferul de trimitere (SO_SNDBUF) utilizat de serverul socket
socket.send.buffer.bytes=102400
# Bufferul de primire (SO_RCVBUF) folosit de serverul socket
socket.receive.buffer.bytes=102400
# Dimensiunea maximă a unei solicitări pe care serverul de socket o va accepta (protecție împotriva OOM)
socket.request.max.bytes=104857600
############################ Elementele de bază ale jurnalului ################### ##########
# O listă de directoare separate prin virgulă în care să stocați fișierele jurnal
log.dirs=/tmp/kafka-logs
# Numărul implicit de partiții de jurnal per subiect. Mai multe partiții permit mai multe
# paralelism pentru consum, dar acest lucru va duce, de asemenea, la mai multe fișiere
# brokerii.
num.partitii=1
# Numărul de fire de execuție per director de date care va fi utilizat pentru recuperarea jurnalului la pornire și spălarea la închidere.
# Se recomandă creșterea acestei valori pentru instalațiile cu direcții de date situate în matricea RAID.
num.recovery.threads.per.data.dir=1
############################ Setări interne ale subiectului ################## ###########
# Factorul de replicare pentru subiectele interne ale metadatelor de grup „__consumer_offsets” și „__transaction_state”
# Pentru orice altceva decât testarea de dezvoltare, se recomandă o valoare mai mare decât 1 pentru a asigura disponibilitatea, cum ar fi 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################ Politica de eliminare a jurnalului ################## ###########
# Mesajele sunt scrise imediat în sistemul de fișiere, dar implicit doar fsync() pentru sincronizare
# cache-ul sistemului de operare leneș. Următoarele configurații controlează fluxul de date pe disc.
# Există câteva compromisuri importante aici:
# 1. Durabilitate: Datele neîncălcate se pot pierde dacă nu utilizați replicarea.
# 2. Latență: Intervalele foarte mari de spălare pot duce la vârfuri de latență atunci când are loc spălarea, deoarece vor fi multe date de spălat.
# 3. Debit: Spălarea este, în general, cea mai costisitoare operațiune, iar un interval mic de spălare poate duce la căutări excesive.
# Setările de mai jos permit configurarea politicii de spălare pentru a spăla datele după o perioadă de timp sau
# fiecare N mesaje (sau ambele). Acest lucru se poate face la nivel global și poate fi înlocuit în funcție de subiect.
# Numărul de mesaje de acceptat înainte de a forța o spălare de date pe disc
#log.flush.interval.messages=10000
# Cantitatea maximă de timp în care un mesaj poate sta într-un jurnal înainte de a forța o spălare
#log.flush.interval.ms=1000
############################ Politica de păstrare a jurnalelor ################## ###########
# Următoarele configurații controlează eliminarea segmentelor de jurnal. Politica poate
# fie setat pentru a șterge segmente după o perioadă de timp sau după ce s-a acumulat o anumită dimensiune.
# Un segment va fi șters ori de câte ori * oricare* dintre aceste criterii este îndeplinit. Ștergerea are loc întotdeauna
# de la sfârșitul jurnalului.
# Vârsta minimă a unui fișier jurnal pentru a fi eligibil pentru ștergere din cauza vârstei
log.retention.hours=168
# O politică de păstrare a jurnalelor bazată pe dimensiune. Segmentele sunt tăiate din buștean, cu excepția cazului în care rămân
# segmente coboară sub log.retention.bytes. Funcționează independent de log.retention.ore.
#log.retention.bytes=1073741824
# Dimensiunea maximă a unui fișier cu segment de jurnal. Când această dimensiune este atinsă, va fi creat un nou segment de jurnal.
log.segment.bytes=1073741824
# Intervalul la care sunt verificate segmentele de jurnal pentru a vedea dacă pot fi șterse conform
# la politicile de păstrare
log.retention.check.interval.ms=300000
############################# Ingrijitor zoo #################### #########
# șir de conexiune Zookeeper (consultați documentele zookeeper pentru detalii).
# Aceasta este o pereche gazdă:port separată prin virgulă, fiecare corespunzând unui zk
# Server. de exemplu. „127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.
# Puteți adăuga, de asemenea, un șir opțional chroot la adresele URL pentru a specifica
# director rădăcină pentru toate znodele kafka.
zookeeper.connect=localhost:2181
# Timeout în ms pentru conectarea la zookeeper
zookeeper.connection.timeout.ms=18000
############################ Setări coordonatorului grupului ################## ###########
# Următoarea configurație specifică timpul, în milisecunde, în care GroupCoordinator va întârzia reechilibrarea inițială a consumatorului.
# Reechilibrarea va fi întârziată și mai mult cu valoarea group.initial.rebalance.delay.ms pe măsură ce noi membri se alătură grupului, până la maximum max.poll.interval.ms.
# Valoarea implicită pentru aceasta este de 3 secunde.
# O înlocuim aici la 0, deoarece oferă o experiență mai bună pentru dezvoltare și testare.
# Cu toate acestea, în mediile de producție, valoarea implicită de 3 secunde este mai potrivită, deoarece aceasta va ajuta la evitarea reechilibrărilor inutile și potențial costisitoare în timpul pornirii aplicației.
group.initial.rebalance.delay.ms=0
Brokerul Kafka începe fără probleme cu:
$ sudo bin/kafka-server-start.sh -daemon config/server.properties
Încep subiectul cu:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
Verific apoi cu:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Test
Așa că mă întreb unde m-am încurcat. BTW: Serverul este accesibil de la computerul client:
$ ping -c 5 10.0.0.20 -p 9092
MODEL: 0x9092
PING 10.0.0.20 (10.0.0.20) 56(84) octeți de date.
64 de octeți din 10.0.0.20: icmp_seq=1 ttl=64 time=0,468 ms
64 de octeți din 10.0.0.20: icmp_seq=2 ttl=64 time=0,790 ms
64 de octeți din 10.0.0.20: icmp_seq=3 ttl=64 time=0,918 ms
64 de octeți din 10.0.0.20: icmp_seq=4 ttl=64 time=0,453 ms
64 de octeți din 10.0.0.20: icmp_seq=5 ttl=64 time=0,827 ms
--- 10.0.0.20 statistici ping ---
5 pachete transmise, 5 primite, 0% pierdere de pachete, timp 4095 ms
rtt min/avg/max/mdev = 0,453/0,691/0,918/0,192 ms