Troubleshooting Kafka Connectivity with spark streaming

RMAG news

Iam currently developing a mini project that integrates Jupyter and Kafka with Spark Streaming for data processing, and uses Cassandra for storage. For visualization and alert management, I am using Grafana. The project also includes a specific feature to display the temperature from various regions, which is managed randomly. However, I am encountering an issue with the connection between the producer and consumer initially. I would appreciate some help, please. I have deployed this on Docker

producer

# Installation de la bibliothèque kafka-python si nécessaire
!pip install kafka-python

import json
import random
import time
from kafka import KafkaProducer
import threading
from IPython.display import display
import ipywidgets as widgets

# Configuration du producteur Kafka
producer = KafkaProducer(
bootstrap_servers=[‘kafka:9092’], # Ajustez l’adresse si nécessaire
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’)
)

# Fonction pour générer des données simulées
def generate_sensor_data():
return {
“sensor_id”: random.randint(1, 100),
“timestamp”: int(time.time() * 1000),
“temperature”: round(random.uniform(20.0, 35.0), 2),
“humidity”: round(random.uniform(40.0, 60.0), 2),
“pressure”: round(random.uniform(970.0, 1030.0), 2)
}

# Fonction pour envoyer les données
def send_sensor_data(stop_event):
while not stop_event.is_set():
data = generate_sensor_data()
producer.send(‘temperature’, value=data)
print(f”Data sent: {data}”)
time.sleep(1)

stop_event = threading.Event()

# Widgets pour contrôler l’envoi des données
start_button = widgets.Button(description=”Start Sending Data”)
stop_button = widgets.Button(description=”Stop Sending Data”)
output = widgets.Output()

display(start_button, stop_button, output)

def start_sending_data(b):
with output:
global thread
if not stop_event.is_set():
stop_event.clear()
thread = threading.Thread(target=send_sensor_data, args=(stop_event,))
thread.start()
print(“Started sending data…”)

def stop_sending_data(b):
with output:
if not stop_event.is_set():
stop_event.set()
thread.join()
print(“Stopped sending data.”)

start_button.on_click(start_sending_data)
stop_button.on_click(stop_sending_data)

et consumateur

`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, window, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Initialisation de SparkSession avec l’intégration de Cassandra
spark = SparkSession
.builder
.appName(“Weather Data Streaming”)
.config(“spark.sql.extensions”, “com.datastax.spark.connector.CassandraSparkExtensions”)
.config(“spark.cassandra.connection.host”, “localhost”)
.getOrCreate()

# Lecture des messages en streaming depuis Kafka
df = spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “kafka:9092”)
.option(“subscribe”, “temperature”)
.option(“startingOffsets”, “earliest”)
.load()

# Schéma des données JSON reçues de Kafka
schema = StructType([
StructField(“sensor_id”, StringType()),
StructField(“timestamp”, LongType()), # temps en millisecondes depuis l’époque UNIX
StructField(“temperature”, DoubleType()),
StructField(“humidity”, DoubleType()),
StructField(“pressure”, DoubleType())
])

# Transformation des données brutes en DataFrame structuré
weather_data = df.select(
from_json(col(“value”).cast(“string”), schema).alias(“data”)
).select(
“data.sensor_id”,
to_timestamp(from_unixtime(col(“data.timestamp”) / 1000)).alias(“timestamp”),
“data.temperature”,
“data.humidity”,
“data.pressure”
)

# Calcul des statistiques sur les températures
weather_stats = weather_data
.groupBy(
window(col(“timestamp”), “1 hour”),
col(“sensor_id”)
)
.agg(
avg(“temperature”).alias(“avg_temp”),
min(“temperature”).alias(“min_temp”),
max(“temperature”).alias(“max_temp”)
)

# Fonction pour écrire les résultats dans Cassandra
def write_to_cassandra(batch_df, batch_id):
batch_df.write
.format(“org.apache.spark.sql.cassandra”)
.mode(“append”)
.options(table=”weather”, keyspace=”weatherSensors”)
.save()

# Configuration du Stream pour écrire dans Cassandra
query = weather_stats
.writeStream
.outputMode(“complete”)
.foreachBatch(write_to_cassandra)
.start()

query.awaitTermination()`

Leave a Reply

Your email address will not be published. Required fields are marked *