Apache Kafka

Jak czytać dane z Kafki za pomocą Pythona

Jak czytać dane z Kafki za pomocą Pythona
Kafka to rozproszony system przesyłania wiadomości o otwartym kodzie źródłowym do wysyłania wiadomości w podzielonych na partycje i różnych tematach. Strumieniowe przesyłanie danych w czasie rzeczywistym można zaimplementować za pomocą Kafki do odbierania danych między aplikacjami. Składa się z trzech głównych części. Są to producent, konsument i tematy. Producent służy do wysyłania wiadomości na określony temat, a do każdej wiadomości dołączany jest klucz. Konsument jest używany do odczytywania wiadomości na określony temat z zestawu partycji. Dane otrzymane od producenta i przechowywane na partycjach w oparciu o konkretny temat. Wiele bibliotek istnieje w Pythonie, aby stworzyć producenta i konsumenta, aby zbudować system przesyłania wiadomości za pomocą Kafka. W tym samouczku pokazano, jak dane z Kafki można odczytać za pomocą Pythona.

Warunek wstępny

Musisz zainstalować niezbędną bibliotekę Pythona, aby odczytywać dane z Kafki. Python3 jest używany w tym samouczku do napisania skryptu konsumenta i producenta. Jeśli pakiet pip nie był wcześniej zainstalowany w twoim systemie operacyjnym Linux, musisz zainstalować pip przed zainstalowaniem biblioteki Kafka dla Pythona. python3-kafka jest używany w tym samouczku do odczytywania danych z Kafka. Uruchom następujące polecenie, aby zainstalować bibliotekę.

$ pip zainstaluj python3-kafka

Czytanie prostych danych tekstowych z Kafka

Od producenta można przesyłać różne rodzaje danych na określony temat, które mogą być odczytane przez konsumenta. W tej części tutoriala pokazujemy jak proste dane tekstowe mogą być wysyłane i odbierane z Kafki za pomocą producenta i konsumenta.

Utwórz plik o nazwie producent1.py z następującym skryptem Pythona. KafkaProducent moduł jest importowany z biblioteki Kafki. Lista brokerów musi zostać zdefiniowana w momencie inicjalizacji obiektu producenta, aby połączyć się z serwerem Kafka. Domyślny port Kafki to '9092'. Argument bootstrap_servers służy do zdefiniowania nazwy hosta z portem. 'Pierwszy_temat' jest ustawiony jako nazwa tematu, pod którym sms będzie wysyłany od producenta. Następnie prosta wiadomość tekstowa „Witam z Kafki' jest wysyłany za pomocą wysłać() metoda KafkaProducent do tematu 'Pierwszy_temat'.

producent1.py:

# Importuj KafkaProducer z biblioteki Kafka
z importu kafki KafkaProducer
# Zdefiniuj serwer z portem
bootstrap_servers = ['localhost:9092']
# Określ nazwę tematu, w którym wiadomość zostanie opublikowana
nazwa_tematu = 'Pierwszy_Temat'
# Zainicjuj zmienną producenta
producent = KafkaProducer(bootstrap_servers = bootstrap_servers)
# Opublikuj tekst w określonym temacie
producent.send(topicName, b'Hello from kafka… ')
# Drukuj wiadomość
print("Wiadomość wysłana")

Utwórz plik o nazwie konsument1.py z następującym skryptem Pythona. KafkaConsumer moduł jest importowany z biblioteki Kafka do odczytu danych z Kafka. system moduł jest tutaj używany do zakończenia skryptu. Ta sama nazwa hosta i numer portu producenta są używane w skrypcie konsumenta do odczytu danych z Kafka. Nazwa tematu konsumenta i producenta musi być taka sama, jak „Pierwszy_temat'.  Następnie obiekt konsumenta jest inicjowany trzema argumentami. Nazwa tematu, identyfikator grupy i informacje o serwerze. dla pętla służy tutaj do odczytania tekstu wysłanego od producenta Kafki.

konsument1.py:

# Importuj KafkaConsumer z biblioteki Kafka
z importu kafki KafkaConsumer
# Importuj moduł sys
system importu
# Zdefiniuj serwer z portem
bootstrap_servers = ['localhost:9092']
# Określ nazwę tematu, z którego otrzyma wiadomość
nazwa_tematu = 'Pierwszy_Temat'
# Zainicjuj zmienną konsumenta
konsument = KafkaConsumer (topicName, group_id ='group1',bootstrap_servers =
bootstrap_servers)
# Przeczytaj i wydrukuj wiadomość od konsumenta
dla wiadomości w konsumencie:
print("Nazwa tematu=%s,Wiadomość=%s"%(msg.temat,wiadomość.wartość))
# Zakończ skrypt
system.Wyjście()

Wynik:

Uruchom następujące polecenie z jednego terminala, aby wykonać skrypt producenta.

$python3 producent1.py

Następujące dane wyjściowe pojawią się po wysłaniu wiadomości.

Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.

$ python3 konsument1.py

Wyjście pokazuje nazwę tematu i wiadomość tekstową wysłaną od producenta.

Odczytywanie danych w formacie JSON z Kafki

Dane w formacie JSON mogą być przesyłane przez producenta Kafki i odczytywane przez konsumenta Kafki za pomocą json moduł Pythona. Jak dane JSON mogą być serializowane i deserializowane przed wysłaniem i odebraniem danych za pomocą modułu python-kafka pokazano w tej części tego samouczka.

Utwórz skrypt Pythona o nazwie producent2.py z następującym skryptem. Inny moduł o nazwie JSON jest importowany z KafkaProducent moduł tutaj. serializator_wartości argument jest używany z bootstrap_servers tutaj argument, aby zainicjalizować obiekt producenta Kafki. Ten argument wskazuje, że dane JSON będą kodowane przy użyciu „utf-8' zestaw znaków w momencie wysyłania. Następnie dane w formacie JSON są wysyłane do tematu o nazwie JSONtopic.

producent2.py:

# Importuj KafkaProducer z biblioteki Kafka
z importu kafki KafkaProducer
# Importuj moduł JSON do serializacji danych
importuj json
# Zainicjuj zmienną producenta i ustaw parametr dla kodowania JSON
producent = KafkaProducer(bootstrap_servers =
['localhost:9092'],value_serializer=lambda v:json.wysypiska(v).koduj('utf-8'))
# Wyślij dane w formacie JSON
producent.send('JSONtopic', 'name': 'fahmida','email':'[email protected]')
 
# Drukuj wiadomość
print("Wiadomość wysłana do JSONtopic")

Utwórz skrypt Pythona o nazwie konsument2.py z następującym skryptem. KafkaConsumer, system a moduły JSON są importowane w tym skrypcie. KafkaConsumer moduł służy do odczytu danych w formacie JSON z Kafka. Moduł JSON służy do dekodowania zakodowanych danych JSON wysyłanych od producenta Kafkaka. Sys moduł służy do zakończenia skryptu. value_deserializer argument jest używany z bootstrap_servers aby zdefiniować sposób dekodowania danych JSON. Kolejny, dla pętla służy do drukowania wszystkich rekordów konsumentów i danych JSON pobranych z Kafki.

konsument2.py:

# Importuj KafkaConsumer z biblioteki Kafka
z importu kafki KafkaConsumer
# Importuj moduł sys
system importu
# Importuj moduł json do serializacji danych
importuj json
# Zainicjuj zmienną konsumenta i ustaw właściwość dla dekodowania JSON
konsument = KafkaConsumer ('JSONtopic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.ładunki (m.dekodować('utf-8')))
# Odczytaj dane z kafki
dla wiadomości w konsumencie:
print("Rekordy konsumentów:\n")
drukuj (wiadomość)
print("\nCzytanie z danych JSON\n")
print("Nazwa:",wiadomość[6]['imię'])
print("E-mail:",wiadomość[6]['e-mail'])
# Zakończ skrypt
system.Wyjście()

Wynik:

Uruchom następujące polecenie z jednego terminala, aby wykonać skrypt producenta.

$ python3 producent2.py

Po przesłaniu danych JSON skrypt wydrukuje następującą wiadomość message.

Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.

$ python3 konsument2.py

Następujące dane wyjściowe pojawią się po uruchomieniu skryptu.

Wniosek:

Dane mogą być wysyłane i odbierane w różnych formatach od Kafki za pomocą Pythona. Dane mogą być również przechowywane w bazie danych i pobierane z bazy danych za pomocą Kafki i Pythona. W domu, ten samouczek pomoże użytkownikowi Pythona rozpocząć pracę z Kafką.

Jak zmienić ustawienia myszy i touchpada za pomocą Xinput w systemie Linux?
Większość dystrybucji Linuksa jest domyślnie dostarczana z biblioteką „libinput” do obsługi zdarzeń wejściowych w systemie. Może przetwarzać zdarzenia...
Remap your mouse buttons differently for different software with X-Mouse Button Control
Maybe you need a tool that could make your mouse's control change with every application that you use. If this is the case, you can try out an applica...
Microsoft Sculpt Touch Wireless Mouse Review
I recently read about the Microsoft Sculpt Touch wireless mouse and decided to buy it. After using it for a while, I decided to share my experience wi...