-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathspark_stream.py
More file actions
175 lines (143 loc) · 5.92 KB
/
spark_stream.py
File metadata and controls
175 lines (143 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import logging
from datetime import datetime
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streaming
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
logger.info("Keyspace created successfully")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streaming.created_users (
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT PRIMARY KEY,
dob TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
logger.info("Table created successfully")
def insert_data(session, **kwargs):
logger.info("Inserting data")
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streaming.created_users (first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logger.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logger.error(f"Error while inserting data: {e}")
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
.config('spark.cassandra.connection.host', 'cassandra_db') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
logger.info("Spark connection created successfully")
except Exception as e:
logger.error(f"Error while creating spark connection: {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'broker:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logger.info("Kafka dataframe created successfully")
except Exception as e:
logger.error(f"Kafka dataframe could not be created because: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error details: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
if spark_df is None:
logger.error("Failed to create Kafka dataframe")
return spark_df
def create_cassandra_connection():
try:
# Connection to Cassandra cluster
cluster = Cluster(['cassandra_db'])
cas_session = cluster.connect()
logger.info("Cassandra connection created successfully")
return cas_session
except Exception as e:
logger.error(f"Error while creating Cassandra connection: {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("dob", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
logger.info("Selection dataframe created successfully")
return sel
if __name__ == "__main__":
# Create Spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# Create connection to Kafka with Spark
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
logger.info("Selection dataframe schema:")
selection_df.printSchema()
# Create Cassandra connection
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
# Insert data into Cassandra
insert_data(session)
streaming_query = selection_df.writeStream.format("org.apache.spark.sql.cassandra") \
.option('keyspace', 'spark_streaming', ) \
.option('checkpointLocation', '/tmp/checkpoint') \
.option('table', 'created_users') \
.start()
streaming_query.awaitTermination()
session.shutdown()