Warunek wstępny
Musisz zainstalować niezbędną bibliotekę Pythona, aby odczytywać dane z Kafki. Python3 jest używany w tym samouczku do napisania skryptu konsumenta i producenta. Jeśli pakiet pip nie był wcześniej zainstalowany w twoim systemie operacyjnym Linux, musisz zainstalować pip przed zainstalowaniem biblioteki Kafka dla Pythona. python3-kafka jest używany w tym samouczku do odczytywania danych z Kafka. Uruchom następujące polecenie, aby zainstalować bibliotekę.
$ pip zainstaluj python3-kafkaCzytanie prostych danych tekstowych z Kafka
Od producenta można przesyłać różne rodzaje danych na określony temat, które mogą być odczytane przez konsumenta. W tej części tutoriala pokazujemy jak proste dane tekstowe mogą być wysyłane i odbierane z Kafki za pomocą producenta i konsumenta.
Utwórz plik o nazwie producent1.py z następującym skryptem Pythona. KafkaProducent moduł jest importowany z biblioteki Kafki. Lista brokerów musi zostać zdefiniowana w momencie inicjalizacji obiektu producenta, aby połączyć się z serwerem Kafka. Domyślny port Kafki to '9092'. Argument bootstrap_servers służy do zdefiniowania nazwy hosta z portem. 'Pierwszy_temat' jest ustawiony jako nazwa tematu, pod którym sms będzie wysyłany od producenta. Następnie prosta wiadomość tekstowa „Witam z Kafki' jest wysyłany za pomocą wysłać() metoda KafkaProducent do tematu 'Pierwszy_temat'.
producent1.py:
# Importuj KafkaProducer z biblioteki Kafkaz importu kafki KafkaProducer
# Zdefiniuj serwer z portem
bootstrap_servers = ['localhost:9092']
# Określ nazwę tematu, w którym wiadomość zostanie opublikowana
nazwa_tematu = 'Pierwszy_Temat'
# Zainicjuj zmienną producenta
producent = KafkaProducer(bootstrap_servers = bootstrap_servers)
# Opublikuj tekst w określonym temacie
producent.send(topicName, b'Hello from kafka… ')
# Drukuj wiadomość
print("Wiadomość wysłana")
Utwórz plik o nazwie konsument1.py z następującym skryptem Pythona. KafkaConsumer moduł jest importowany z biblioteki Kafka do odczytu danych z Kafka. system moduł jest tutaj używany do zakończenia skryptu. Ta sama nazwa hosta i numer portu producenta są używane w skrypcie konsumenta do odczytu danych z Kafka. Nazwa tematu konsumenta i producenta musi być taka sama, jak „Pierwszy_temat'. Następnie obiekt konsumenta jest inicjowany trzema argumentami. Nazwa tematu, identyfikator grupy i informacje o serwerze. dla pętla służy tutaj do odczytania tekstu wysłanego od producenta Kafki.
konsument1.py:
# Importuj KafkaConsumer z biblioteki Kafkaz importu kafki KafkaConsumer
# Importuj moduł sys
system importu
# Zdefiniuj serwer z portem
bootstrap_servers = ['localhost:9092']
# Określ nazwę tematu, z którego otrzyma wiadomość
nazwa_tematu = 'Pierwszy_Temat'
# Zainicjuj zmienną konsumenta
konsument = KafkaConsumer (topicName, group_id ='group1',bootstrap_servers =
bootstrap_servers)
# Przeczytaj i wydrukuj wiadomość od konsumenta
dla wiadomości w konsumencie:
print("Nazwa tematu=%s,Wiadomość=%s"%(msg.temat,wiadomość.wartość))
# Zakończ skrypt
system.Wyjście()
Wynik:
Uruchom następujące polecenie z jednego terminala, aby wykonać skrypt producenta.
$python3 producent1.pyNastępujące dane wyjściowe pojawią się po wysłaniu wiadomości.
Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.
$ python3 konsument1.pyWyjście pokazuje nazwę tematu i wiadomość tekstową wysłaną od producenta.
Odczytywanie danych w formacie JSON z Kafki
Dane w formacie JSON mogą być przesyłane przez producenta Kafki i odczytywane przez konsumenta Kafki za pomocą json moduł Pythona. Jak dane JSON mogą być serializowane i deserializowane przed wysłaniem i odebraniem danych za pomocą modułu python-kafka pokazano w tej części tego samouczka.
Utwórz skrypt Pythona o nazwie producent2.py z następującym skryptem. Inny moduł o nazwie JSON jest importowany z KafkaProducent moduł tutaj. serializator_wartości argument jest używany z bootstrap_servers tutaj argument, aby zainicjalizować obiekt producenta Kafki. Ten argument wskazuje, że dane JSON będą kodowane przy użyciu „utf-8' zestaw znaków w momencie wysyłania. Następnie dane w formacie JSON są wysyłane do tematu o nazwie JSONtopic.
producent2.py:
# Importuj KafkaProducer z biblioteki Kafkaz importu kafki KafkaProducer
# Importuj moduł JSON do serializacji danych
importuj json
# Zainicjuj zmienną producenta i ustaw parametr dla kodowania JSON
producent = KafkaProducer(bootstrap_servers =
['localhost:9092'],value_serializer=lambda v:json.wysypiska(v).koduj('utf-8'))
# Wyślij dane w formacie JSON
producent.send('JSONtopic', 'name': 'fahmida','email':'[email protected]')
# Drukuj wiadomość
print("Wiadomość wysłana do JSONtopic")
Utwórz skrypt Pythona o nazwie konsument2.py z następującym skryptem. KafkaConsumer, system a moduły JSON są importowane w tym skrypcie. KafkaConsumer moduł służy do odczytu danych w formacie JSON z Kafka. Moduł JSON służy do dekodowania zakodowanych danych JSON wysyłanych od producenta Kafkaka. Sys moduł służy do zakończenia skryptu. value_deserializer argument jest używany z bootstrap_servers aby zdefiniować sposób dekodowania danych JSON. Kolejny, dla pętla służy do drukowania wszystkich rekordów konsumentów i danych JSON pobranych z Kafki.
konsument2.py:
# Importuj KafkaConsumer z biblioteki Kafkaz importu kafki KafkaConsumer
# Importuj moduł sys
system importu
# Importuj moduł json do serializacji danych
importuj json
# Zainicjuj zmienną konsumenta i ustaw właściwość dla dekodowania JSON
konsument = KafkaConsumer ('JSONtopic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.ładunki (m.dekodować('utf-8')))
# Odczytaj dane z kafki
dla wiadomości w konsumencie:
print("Rekordy konsumentów:\n")
drukuj (wiadomość)
print("\nCzytanie z danych JSON\n")
print("Nazwa:",wiadomość[6]['imię'])
print("E-mail:",wiadomość[6]['e-mail'])
# Zakończ skrypt
system.Wyjście()
Wynik:
Uruchom następujące polecenie z jednego terminala, aby wykonać skrypt producenta.
$ python3 producent2.pyPo przesłaniu danych JSON skrypt wydrukuje następującą wiadomość message.
Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.
$ python3 konsument2.pyNastępujące dane wyjściowe pojawią się po uruchomieniu skryptu.
Wniosek:
Dane mogą być wysyłane i odbierane w różnych formatach od Kafki za pomocą Pythona. Dane mogą być również przechowywane w bazie danych i pobierane z bazy danych za pomocą Kafki i Pythona. W domu, ten samouczek pomoże użytkownikowi Pythona rozpocząć pracę z Kafką.