Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks

Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks

Abstract

In this article, we’ll see how to use the Apache Spark GraphFrames package with SingleStore by using historical data about the London Underground network. We’ll store the data as stations (vertices) and line connections (edges) in SingleStore and then load the data into a notebook environment and perform some queries on the data using GraphFrames.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We’ll use the following settings:

Workspace Group Name: Spark Demo Group

Cloud Provider: AWS

Region: US East 1 (N. Virginia)

Workspace Name: spark-demo

Size: S-00

Create a new notebook

From the left navigation pane in the cloud portal, we’ll select Develop > Notebooks.

In the top right of the web page, we’ll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

We’ll call the notebook spark_graphframes_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let’s install Spark:

!pip cache purge –quiet
!conda install -y –quiet -c conda-forge openjdk pyspark

Next, we’ll install some libraries:

!pip install folium –quiet
!pip install graphframes –quiet

In previous articles in our Apache Spark series using notebooks, we’ve downloaded jar files into a local directory. However, in this article, we’ll use spark.jars.packages and create our SparkSession as follows:

from pyspark.sql import SparkSession

# List of Maven coordinates for all required packages
maven_packages = [
graphframes:graphframes:0.8.3-spark3.5-s_2.12,
org.scala-lang:scala-library:2.12,
com.singlestore:singlestore-jdbc-client:1.2.1,
com.singlestore:singlestore-spark-connector_2.12:4.1.5-spark-3.5.0,
org.apache.commons:commons-dbcp2:2.12.0,
org.apache.commons:commons-pool2:2.12.0,
io.spray:spray-json_3:1.3.6
]

# Create Spark session with all required packages
spark = (SparkSession
.builder
.config(spark.jars.packages, ,.join(maven_packages))
.appName(Spark GraphFrames Test)
.getOrCreate()
)

spark.sparkContext.setLogLevel(ERROR)

Now we’ll create the connection details to SingleStore:

host = <HOST>
password = <PASSWORD>
port = 3306
cluster = host + : + port

Replace <HOST> and <PASSWORD> with the values for your environment. These values can be obtained from the workspace using Connect > SQL IDE.

We also need to set some configuration parameters:

spark.conf.set(spark.datasource.singlestore.ddlEndpoint, cluster)
spark.conf.set(spark.datasource.singlestore.user, admin)
spark.conf.set(spark.datasource.singlestore.password, password)
spark.conf.set(spark.datasource.singlestore.disablePushdown, false)

A database is required, so we’ll create one:

DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;

We’ll also create tables to store the data:

USE spark_demo;

DROP TABLE IF EXISTS connections;
CREATE ROWSTORE TABLE IF NOT EXISTS connections (
src INT,
dst INT,
line VARCHAR(32),
colour VARCHAR(8),
time INT,
PRIMARY KEY(src, dst, line)
);

DROP TABLE IF EXISTS stations;
CREATE ROWSTORE TABLE IF NOT EXISTS stations (
id INT PRIMARY KEY,
latitude DOUBLE,
longitude DOUBLE,
name VARCHAR(32),
zone FLOAT,
total_lines INT,
rail INT
);

Now we’ll download the historical data for the London Underground into Pandas. We’ll also perform some adjustments for GraphFrames and some merging of the data to mirror the schema of the tables we defined above:

import pandas as pd

connections_url = https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_connections.csv
stations_url = https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_stations.csv
lines_url = https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_lines.csv

connections_df = pd.read_csv(connections_url)
connections_df.rename(
columns = {station1: src, station2: dst},
inplace = True
)

stations_df = pd.read_csv(stations_url)
stations_df.drop(
display_name,
axis = 1,
inplace = True
)

lines_df = pd.read_csv(lines_url)
lines_df.drop(
stripe,
axis = 1,
inplace = True
)

connections_df = pd.merge(
connections_df,
lines_df,
on = line,
how = left
)
connections_df.drop(
line,
axis = 1,
inplace = True
)
connections_df.rename(
columns = {name: line},
inplace = True
)

Just before we save our data into SingleStore, we’ll create a map of the London Underground using Folium:

import folium

London = [51.509865, 0.118092]
mymap = folium.Map(location = London, zoom_start = 12)

# Add markers for stations
for idx, row in stations_df.iterrows():
folium.Marker(
[row[latitude], row[longitude]],
popup = row[name]
).add_to(mymap)

# Add lines with colours
for idx, row in connections_df.iterrows():
source = stations_df.loc[stations_df[id] == row[src]]
target = stations_df.loc[stations_df[id] == row[dst]]

# Extract latitude and longitude
source_coords = (float(source[latitude].iloc[0]), float(source[longitude].iloc[0]))
target_coords = (float(target[latitude].iloc[0]), float(target[longitude].iloc[0]))

folium.PolyLine(
locations = [source_coords, target_coords],
color = row[colour]
).add_to(mymap)

mymap

This produces a map, as shown in Figure 2. We can scroll and zoom the map. When clicked, a marker will show the station name and the lines are coloured according to the London Underground scheme.

Figure 2. Map using Folium.

We’ll now prepare the connection to SingleStore:

from sqlalchemy import *

db_connection = create_engine(connection_url)

and write the connections data:

connections_df.to_sql(
connections,
con = db_connection,
if_exists = append,
index = False,
chunksize = 1000
)

and stations data:

stations_df.to_sql(
stations,
con = db_connection,
if_exists = append,
index = False,
chunksize = 1000
)

We can check the data in the connections table:

SELECT * FROM connections LIMIT 5;

and stations table:

SELECT * FROM stations LIMIT 5;

With the data safely stored in SingleStore, we can now read it back into Spark and use GraphFrames. First the connections data:

connections = (spark.read
.format(singlestore)
.load(spark_demo.connections)
)

and then the stations data:

stations = (spark.read
.format(singlestore)
.load(spark_demo.stations)
)

Now we’ll create a GraphFrame:

from graphframes import GraphFrame

underground = GraphFrame(stations, connections)

We can show the vertices:

underground.vertices.show(5)

Example output:

+—+——–+———+—————+—-+———–+—-+
| id|latitude|longitude| name|zone|total_lines|rail|
+—+——–+———+—————+—-+———–+—-+
| 25| 51.512| -0.1031| Blackfriars| 1.0| 2| 0|
| 39| 51.5481| -0.1188|Caledonian Road| 2.0| 1| 0|
| 43| 51.5147| 0.0082| Canning Town| 3.0| 2| 0|
| 50| 51.7052| -0.611| Chesham|10.0| 1| 0|
| 60| 51.5129| -0.1243| Covent Garden| 1.0| 1| 0|
+—+——–+———+—————+—-+———–+—-+
only showing top 5 rows

We can show the edges:

underground.edges.show(5)

Example output:

+—+—+——————–+——-+—-+
|src|dst| line| colour|time|
+—+—+——————–+——-+—-+
| 7|145| Northern Line|#000000| 2|
| 11|163| Bakerloo Line|#B36305| 1|
| 19| 97|Docklands Light R…|#00A4A7| 2|
| 28|192| Central Line|#E32017| 1|
| 49|151| Northern Line|#000000| 2|
+—+—+——————–+——-+—-+
only showing top 5 rows

We can check how many stations are in each London Underground Zone:

(underground
.vertices
.groupBy(zone)
.count()
.orderBy(count, ascending = False)
.show()
)

Example output:

+—-+—–+
|zone|count|
+—-+—–+
| 2.0| 75|
| 1.0| 60|
| 3.0| 47|
| 4.0| 38|
| 5.0| 28|
| 6.0| 18|
| 2.5| 17|
| 3.5| 6|
| 1.5| 4|
| 8.0| 2|
|10.0| 2|
| 7.0| 2|
| 9.0| 1|
| 6.5| 1|
| 5.5| 1|
+—-+—–+

It may be useful to find the number of stations by the line name:

(underground
.edges
.filter(line = District Line‘”)
.count()
)

Example output:

59

It could be interesting to know the maximum number of lines running through a station:

(underground
.vertices
.groupBy()
.max(total_lines)
.show()
)

Example output:

+—————-+
|max(total_lines)|
+—————-+
| 6|
+—————-+

and to find the station with the most lines running through it:

(underground
.vertices
.filter(total_lines == 6)
.show()
)

Example output:

+—+——–+———+——————–+—-+———–+—-+
| id|latitude|longitude| name|zone|total_lines|rail|
+—+——–+———+——————–+—-+———–+—-+
|145| 51.5308| -0.1238|King’s Cross St. …| 1.0| 6| 1|
+—+——–+———+——————–+—-+———–+—-+

Many more types of queries are possible. The GraphFrames documentation is a good place to start.

Finally, we can stop Spark:

spark.stop()

Summary

In this short article, we’ve seen the ease with which we can store graph data in SingleStore and how we can use GraphFrames to perform various queries on the data.

Leave a Reply

Your email address will not be published. Required fields are marked *