
Kafkaをとりあえず動かそう
ProducerがKafkaに発射して、特定のトピックにデータが投げられて、Consumerで受け取る。ひとまずそれだけをやるチュートリアルです。あとは、provectusさんが作ってるossが便利そうだったので、紹介しようと思います。ossって確かに有名処のサービスのフロントページを作るとかだったら何とかアホでも関われそうな気がするした。。。
【参考】
Apache Kafka in Docker Container and Implement Its Functionalities with Python
https://dzone.com/articles/apache-kafka-python
とりあえずdocker-compose.yml
分かりました、まずdocker-compose.ymlですね、こちらになります。
version: "2"
services:
zookeeper:
container_name: zookeeper
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
container_name: kafka
image: docker.io/bitnami/kafka:2
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "18080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
depends_on:
- kafka
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
zookeeperとkafkaとkafka-uiのコンテナを立てます。dockerfileは要らないとのことなので、そのままdocker-compose up -dしてしまってください。
Kafkaが立った~
これでkafka立ちました。dockerって凄い。httpの設定なのでhttpsにする場合は他に設定が必要。18080にアクセスするとkafka-uiなるものが表示されます。新しくtopic追加など、、、まあ今の状況がアクセスするだけで見えるのはgui楽だと思います。ソース、reactの部分しかよく分からなかった。。。
データ発射
ではkafkaにproducerからデータを飛ばしましょう。
まずはproducer.py
import datetime
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9093')
try:
for _ in range(200):
the_dt = str(datetime.datetime.utcnow())
val = f"カウントアップ: {_} at {the_dt}".encode(encoding='utf8')
producer.send(topic="hagehoge", value=val)
producer.close()
except Exception as ex:
print(ex)
これでkafka立ちました。dockerって凄い。httpの設定なのでhttpsにする場合は他にも色々設定が必要。
次はconsumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('hagehoge', bootstrap_servers='localhost:9093')
for msg in consumer:
topic = msg[0]
value = msg[6]
print(msg)
print(f"{topic}:{value.decode()}")
consumer.pyを起動しておいて、producer.pyを実行すると、kafkaを通して一気にデータが流れますね。めっちゃ早くてよくみれないですが。
以下のようにデータもguiで見れる見たいです。データ数が何百万とかなっても使えるんですかね。。。あとはproucer側でセンサ値とか取るようにして、consumer側でそのままdbとかにぶち込むようにすればリアルタイム✖️大量データストリーミングがいけるみたいです。