Puncte:1

Nicio conexiune cu Kafka de la clientul Faust

drapel pe

Am o conexiune dificilă la o mașină care rulează Kafka de la un client care rulează un script Faust. Scriptul arată astfel:

import faust
jurnal de import
din asyncio import sleep


Class Test(faust.Record):
    mesaj: str


app = faust.App('myapp', broker='kafka://10.0.0.20:9092')
topic = app.topic('test', value_type=Test)


@app.agent(subiect)
async def salut (mesaje):
    asincron pentru mesajul din mesaje:
        print(f'Primit {message.msg}')


@app.timer(interval=5.0)
async def example_sender():
    așteaptă salut.trimite(
        valoare=Test(msg='Bună lume!'),
    )


if __name__ == '__main__':
    app.main()

Când rulez scriptul:

# faust -A myapp worker -l info
âÆaµSâ v0.8.1ââ¬âââââââââââââââââââââ âââââââââââââââ†âââââââââââââââ†âââââ
â id â myapp â
â transport â [URL('kafka://10.0.0.20:9092')] â
â stocare â memorie: â
â web â http://hubbabubba:6066 â
â jurnal â -stderr- (informații) â
â pid â 260765 â
â nume de gazdă â hubbabubba â
â platformă â CPython 3.8.10 (Linux x86_64) â
â drivere â â
â transport â aiokafka=0,7,2 â
â web â aiohttp=3.8.1 â
â datadir â /Git/faust-kafka/myapp-data â
â appdir â /Git/faust-kafka/myapp-data/v1 â
âââââââââââââââ´ââ âââââââââââââââ†âââââââââââââââ†âââââââââââââââ
[2022-01-28 13:09:57,018] [260765] [INFO] [^Lucrător]: Începe... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: Începe... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: Începe... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producător]: Începe... 
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: Începe... 
[2022-01-28 13:09:57,024] [260765] [EROARE] Nu se poate conecta la „10.0.0.20:9092”: [Errno 113] Apel de conectare eșuat („10.0.0.20”, 9092) 
[2022-01-28 13:09:57,025] [260765] [EROARE] [^Worker]: Eroare: KafkaConnectionError ("Nu se poate porni bootstrap de la [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>) ]") 
Traceback (cel mai recent apel ultimul):
  Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py”, linia 276, în execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  Fișierul „/usr/lib/python3.8/asyncio/base_events.py”, linia 616, în run_until_complete
    returnează viitor.rezultat()
  Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py”, linia 759, la început
    așteaptă self._default_start()
  Fișierul „/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py”, linia 766, în _default_start
    aştept self._actually_start()...
  Fișierul „/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py”, linia 249, în bootstrap
    ridicați KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Imposibil de pornit de la [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^Lucrător]: Oprire... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Se opresc... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Spălați tamponul producătorului... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: Se opresc... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: Oprire... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Dirijor]: Oprire... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: Se opresc... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^Agent: myapp.hello]: Oprire... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: Oprire... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: Oprire... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumator]: Oprire... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Web]: Oprire... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: Se opresc... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producător]: Oprire... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: Oprire... 
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: Oprire... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Lucrător]: Se adună sarcini de serviciu... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Adunăm toate viitoarele... 
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: Se închide bucla de eveniment

Kafka (v.2.8.1) rulează pe 10.0.0.20, portul 9092. Configurația Kafka arată astfel:

# Licențiat la Apache Software Foundation (ASF) sub unul sau mai multe
# acorduri de licență pentru colaboratori. Vedeți fișierul NOTICE distribuit cu
# această lucrare pentru informații suplimentare cu privire la dreptul de autor.
# ASF vă acordă licența acestui fișier în baza licenței Apache, versiunea 2.0
# („Licența”); nu puteți utiliza acest fișier decât în ​​conformitate cu
# Licența. Puteți obține o copie a licenței la
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Cu excepția cazului în care este cerut de legea aplicabilă sau dacă este convenit în scris, software
# distribuit sub Licență este distribuit „CA AȘA ESTE”,
# FĂRĂ GARANȚII SAU CONDIȚII DE NICIUN FEL, fie exprese, fie implicite.
# Consultați Licența pentru permisiunile specifice limbii care guvernează și
# limitări conform Licenței.

# vezi kafka.server.KafkaConfig pentru detalii suplimentare și valori implicite

############################ Elementele de bază ale serverului ################### ##########

# Id-ul brokerului. Acesta trebuie setat la un număr întreg unic pentru fiecare broker.
broker.id=0

############################ Setări Socket Server ################## ###########

# Adresa pe care ascultă serverul de socket. Acesta va primi valoarea returnată de la 
# java.net.InetAddress.getCanonicalHostName() dacă nu este configurat.
# FORMAT:
# ascultători = listener_name://host_name:port
# EXEMPLU:
# ascultători = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Numele de gazdă și portul pe care brokerul le va face publicitate producătorilor și consumatorilor. Dacă nu este setat, 
# folosește valoarea pentru „ascultători” dacă este configurat. În caz contrar, va folosi valoarea
# returnat de la java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092

# Mapează numele ascultătorilor la protocoalele de securitate, implicit este ca acestea să fie aceleași. Consultați documentația de configurare pentru mai multe detalii
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# Numărul de fire de execuție pe care serverul le folosește pentru a primi cereri din rețea și a trimite răspunsuri către rețea
num.network.threads=3

# Numărul de fire de execuție pe care serverul le folosește pentru procesarea cererilor, care pot include I/O pe disc
num.io.threads=8

# Bufferul de trimitere (SO_SNDBUF) utilizat de serverul socket
socket.send.buffer.bytes=102400

# Bufferul de primire (SO_RCVBUF) folosit de serverul socket
socket.receive.buffer.bytes=102400

# Dimensiunea maximă a unei solicitări pe care serverul de socket o va accepta (protecție împotriva OOM)
socket.request.max.bytes=104857600


############################ Elementele de bază ale jurnalului ################### ##########

# O listă de directoare separate prin virgulă în care să stocați fișierele jurnal
log.dirs=/tmp/kafka-logs

# Numărul implicit de partiții de jurnal per subiect. Mai multe partiții permit mai multe
# paralelism pentru consum, dar acest lucru va duce, de asemenea, la mai multe fișiere
# brokerii.
num.partitii=1

# Numărul de fire de execuție per director de date care va fi utilizat pentru recuperarea jurnalului la pornire și spălarea la închidere.
# Se recomandă creșterea acestei valori pentru instalațiile cu direcții de date situate în matricea RAID.
num.recovery.threads.per.data.dir=1

############################ Setări interne ale subiectului ################## ###########
# Factorul de replicare pentru subiectele interne ale metadatelor de grup „__consumer_offsets” și „__transaction_state”
# Pentru orice altceva decât testarea de dezvoltare, se recomandă o valoare mai mare decât 1 pentru a asigura disponibilitatea, cum ar fi 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################ Politica de eliminare a jurnalului ################## ###########

# Mesajele sunt scrise imediat în sistemul de fișiere, dar implicit doar fsync() pentru sincronizare
# cache-ul sistemului de operare leneș. Următoarele configurații controlează fluxul de date pe disc.
# Există câteva compromisuri importante aici:
# 1. Durabilitate: Datele neîncălcate se pot pierde dacă nu utilizați replicarea.
# 2. Latență: Intervalele foarte mari de spălare pot duce la vârfuri de latență atunci când are loc spălarea, deoarece vor fi multe date de spălat.
# 3. Debit: Spălarea este, în general, cea mai costisitoare operațiune, iar un interval mic de spălare poate duce la căutări excesive.
# Setările de mai jos permit configurarea politicii de spălare pentru a spăla datele după o perioadă de timp sau
# fiecare N mesaje (sau ambele). Acest lucru se poate face la nivel global și poate fi înlocuit în funcție de subiect.

# Numărul de mesaje de acceptat înainte de a forța o spălare de date pe disc
#log.flush.interval.messages=10000

# Cantitatea maximă de timp în care un mesaj poate sta într-un jurnal înainte de a forța o spălare
#log.flush.interval.ms=1000

############################ Politica de păstrare a jurnalelor ################## ###########

# Următoarele configurații controlează eliminarea segmentelor de jurnal. Politica poate
# fie setat pentru a șterge segmente după o perioadă de timp sau după ce s-a acumulat o anumită dimensiune.
# Un segment va fi șters ori de câte ori * oricare* dintre aceste criterii este îndeplinit. Ștergerea are loc întotdeauna
# de la sfârșitul jurnalului.

# Vârsta minimă a unui fișier jurnal pentru a fi eligibil pentru ștergere din cauza vârstei
log.retention.hours=168

# O politică de păstrare a jurnalelor bazată pe dimensiune. Segmentele sunt tăiate din buștean, cu excepția cazului în care rămân
# segmente coboară sub log.retention.bytes. Funcționează independent de log.retention.ore.
#log.retention.bytes=1073741824

# Dimensiunea maximă a unui fișier cu segment de jurnal. Când această dimensiune este atinsă, va fi creat un nou segment de jurnal.
log.segment.bytes=1073741824

# Intervalul la care sunt verificate segmentele de jurnal pentru a vedea dacă pot fi șterse conform
# la politicile de păstrare
log.retention.check.interval.ms=300000

############################# Ingrijitor zoo #################### #########

# șir de conexiune Zookeeper (consultați documentele zookeeper pentru detalii).
# Aceasta este o pereche gazdă:port separată prin virgulă, fiecare corespunzând unui zk
# Server. de exemplu. „127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.
# Puteți adăuga, de asemenea, un șir opțional chroot la adresele URL pentru a specifica
# director rădăcină pentru toate znodele kafka.
zookeeper.connect=localhost:2181

# Timeout în ms pentru conectarea la zookeeper
zookeeper.connection.timeout.ms=18000


############################ Setări coordonatorului grupului ################## ###########

# Următoarea configurație specifică timpul, în milisecunde, în care GroupCoordinator va întârzia reechilibrarea inițială a consumatorului.
# Reechilibrarea va fi întârziată și mai mult cu valoarea group.initial.rebalance.delay.ms pe măsură ce noi membri se alătură grupului, până la maximum max.poll.interval.ms.
# Valoarea implicită pentru aceasta este de 3 secunde.
# O înlocuim aici la 0, deoarece oferă o experiență mai bună pentru dezvoltare și testare.
# Cu toate acestea, în mediile de producție, valoarea implicită de 3 secunde este mai potrivită, deoarece aceasta va ajuta la evitarea reechilibrărilor inutile și potențial costisitoare în timpul pornirii aplicației.
group.initial.rebalance.delay.ms=0

Brokerul Kafka începe fără probleme cu:

$ sudo bin/kafka-server-start.sh -daemon config/server.properties 

Încep subiectul cu:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

Verific apoi cu:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Test

Așa că mă întreb unde m-am încurcat. BTW: Serverul este accesibil de la computerul client:

$ ping -c 5 10.0.0.20 -p 9092
MODEL: 0x9092
PING 10.0.0.20 (10.0.0.20) 56(84) octeți de date.
64 de octeți din 10.0.0.20: icmp_seq=1 ttl=64 time=0,468 ms
64 de octeți din 10.0.0.20: icmp_seq=2 ttl=64 time=0,790 ms
64 de octeți din 10.0.0.20: icmp_seq=3 ttl=64 time=0,918 ms
64 de octeți din 10.0.0.20: icmp_seq=4 ttl=64 time=0,453 ms
64 de octeți din 10.0.0.20: icmp_seq=5 ttl=64 time=0,827 ms

--- 10.0.0.20 statistici ping ---
5 pachete transmise, 5 primite, 0% pierdere de pachete, timp 4095 ms
rtt min/avg/max/mdev = 0,453/0,691/0,918/0,192 ms
Puncte:1
drapel cn

Acest lucru mi se pare greșit, deoarece ar implica că clientul dvs. de la distanță va încerca să se conecteze la gazdă locală odată ce se vorbește cu serverul de bootstrap, nu cu adresa de la distanță a instanței tale kafka:

advertised.listeners=PLAINTEXT://localhost:9092

L-aș schimba pe IP-ul extern (10.x.x.x) al instanței tale kafka, aș reporni totul și aș încerca din nou.

ElToro1966 avatar
drapel pe
Setați advertised.listeners la 10.0.0.20 și adăugați portul 9092 la porturile permise. Repornit totul. Lucrări! Mulțumiri.

Postează un răspuns

Majoritatea oamenilor nu înțeleg că a pune multe întrebări deblochează învățarea și îmbunătățește legătura interpersonală. În studiile lui Alison, de exemplu, deși oamenii își puteau aminti cu exactitate câte întrebări au fost puse în conversațiile lor, ei nu au intuit legătura dintre întrebări și apreciere. În patru studii, în care participanții au fost implicați în conversații ei înșiși sau au citit transcrieri ale conversațiilor altora, oamenii au avut tendința să nu realizeze că întrebarea ar influența – sau ar fi influențat – nivelul de prietenie dintre conversatori.