Skip to content

rwmutel/stocks-stream-processing

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

39 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Final Project: Stocks Streaming Data Processing

Authors: Roman Mutel and Ivan Shevchenko

📝 Description

For our final project in ✨Big Data Processing course✨, we decided to work with Finnhub API for stock markets and ForEx live data. We wanted to find some other API to be unique. However, it turned out to be a non-trivial search, and eventually, we ended up implementing "crypto" project requirements but for an alternative API (that also features stocks in addition to cryptocurrencies). Our primary interest was the stocks of MAANG companies (Meta, Apple, Amazon, Netflix, Google) and some other important players such as Tesla, Nvidia, Intel, AMD, IBM, and Salesforce. Note that the NASDAQ market operates Monday to Friday 4 A.M. to 8 P.M, so on weekend, the trading volumes will be zero. Alternatively, you can switch to processing 24/7 cryptocurrencies markets by changing constant variable NASDAQ_STOCKS on line 24 in ./websocket_to_kafka/producer.py to predefined CRYPTO_PAIRS.

📜 Design

🗺️ Schema

system schema

As our main building block, we used Apache Spark Structured Streaming API to continuously transform the streaming data from Finnhub API in parallel (and to practice our skills gained in the course). The data from the Finnhub API is first parsed by our websocket_to_kafka producer and forwarded to the input Kafka topic. Then, an auxiliary Spark Streaming application explodes data to contain a single trane in each row and partitions it by the stock symbol and forwards it into the partitioned_input topic to allow better parallelism at the main processing stage. Our main processing Spark application reads the data from partitioned_input, performs needed transformations, and outputs it either into dedicated Kafka topics for further insertion into Redis and Postgres databases or writes directly into Redis or Cassandra. Then, the processed data can be retrieved using our API, which aggregates all three data storage systems into a single user interface.

Queries

Precomputed

A.1 Number of transactions for each stock for each hour in the last 6 hours, excluding the previous hour
Endpoint: GET /transactions/hour

A.2 Total trading volume for each stock for the last 6 hours, excluding the previous hour
Endpoint: GET /volumes/hour

A.3 Number of trades and their total volume for each hour in the last 12 hours, excluding the previous hour
Endpoint: GET /hourly-stats

Ad-Hoc

B.1 Number of trades processed in a specific stock in the last N minutes, excluding the last minute
Endpoint: GET /transactions/minutes/{stock_symbol}?N=<int>

B.2 Top N stock with the highest trading volume in the last hour
Endpoint: GET /volumes/hourly-top?N=<int>

B.3 Stock current price based on its symbol
Endpoint: GET /{stock_symbol}

You can also discover detailed documentation for the API at 0.0.0.0:8080/docs after you have launched the system.

Precomputed Reports

For this type of query, we have a fixed time period of interest (6 or 12 hours). Thus, the data takes a limited amount of memory, so we decided to utilize an in-memory key-value database to reduce the latency. We chose Redis as we heard a lot about it but never had hands-on experience using it. It also has a feature to be scaled into a resilent cluster with read replicas, so it adds to our system's scalability and potential reliability.

The data is stored simply as key-value pairs per hour. We have keys hourly-stock:{query_hour} in the following way:

[
  {
    "stock_1": {
      "volume": <float>,
      "count": <int>
    }
  },
  {
    "stock_2": {
      "volume": <float>,
      "count": <int>
    }
  },
  ...
]

and hourly-aggregation:{query_hour} in the following way:

{
  "total_volume": <float>,
  "total_count": <int>
}

This data is then retrieved by our API, aggregated for 6 or 12 hours, and cached for the current hour. An important detail is that we set the time-to-live parameter for our keys so that hourly-stock:{query_hour} is deleted after 7 hours and hourly-aggregation:{query_hour} is deleted after 13 hours.

Ad-Hoc Reports

This query category is much more heterogeneous, incorporating three different types of databases. Most probably, it would also work with a single one, but we wanted to choose the most suitable solution for each problem and exercise with different technologies.

B.1

For B.1 query, we decided to store our data in a SQL RDBMS, especially Postgres. The reason is that the data comes at a constant rate, a relatively low rate of 1 insert per each of the eight stocks per 40 seconds. However, the N parameter is not limited, so we are keeping all the data, and its volume can potentially overgrow the capabilities of an in-memory DB.

The DDL for the schema of this database is

CREATE TABLE IF NOT EXISTS per_minute_data
(
    stock        VARCHAR,
    dt_minute    BIGINT,
    count        BIGINT NOT NULL,
    total_volume FLOAT  NOT NULL,
    PRIMARY KEY (stock, dt_minute)
);

B.2

B.2 query data is written and retrieved using the Cassandra database. Here, the rate of inserts is much higher since we literally insert each streaming message into the database to calculate the top highest volumes. Even for 1 hour (relative hour, last 60 minutes, not an absolute hour), the amount of data can be pretty large. Thus, even though we do not especially need the features of partitioning and clustering, we may benefit from the reliability and fast reads of Cassandra. Another important detail is that the table is created using WITH default_time_to_live = 3600, removing the need for filtering and making the data automatically deleted after an hour (3600 seconds) so it does not take redundant memory and storage. Thus, the querying is done simply with SELECT stock, SUM(volume) AS total_volume FROM trades GROUP BY stock; CQL query.

The DDL for the schema of the Cassandra table is:

CREATE TABLE IF NOT EXISTS big_data_project.trades
(
    timestamp BIGINT,
    stock     TEXT,
    volume    DOUBLE,
    PRIMARY KEY ( (stock), timestamp )
) WITH default_time_to_live = 3600;

B.3

B.3 query is much computationally easier than the previous two. Moreover, it also takes a constant and small amount of memory, so we decided to again use Redis as our storage. There, each stock has a key of type latest-price:{stock_symbol} containing a value of type {"latest_price": <float>}. The value is not a plaint float just because it is written directly from the Spark Streaming app, meaning that it necessarily has an object as its value. The actual value is computed as a weighted average of all the prices for specific stock in each micro batch of the streaming data. So, for example, if withing the last micro-batch there were two trades for Apple, one of which had volume equals 3 and price for a single one equals 190, and another volume 1 and price 200, the updated price will be 192.5. If no trades for a specific stock appeared in batch, the price would not be updated.

Additional hourly_data and per_minute_data Topics

These topics are the way spark program communicates partially aggregated data to be written in databases. We have chosen to store aggregated data for each individual hour / minute, so in Spark program the data is grouped by the hour / minute it was produced and once in 40 minutes / seconds the intermediate reports are sent into the specified topics. Each report can potentially contain data concerning two periods: the final aggregation report for the previous one and some intermediate results for the current one. Reports are sent as a group of messages, each of which contains ID of the period, stock symbol aggregated volume and number of trades for specified period for specified stock. Per-minute reports are simply written down into the database upon receiving, and hourly ones are aggregated firstly to obtain the total number of trades and their total volume for the specified hour for all stocks. As the number of such reports is small enough, the task of storing the data in the databases can be done by simple python program, and even in case it fails, no data will be lost, as it is stored in Kafka.

The main idea behind these topics is to decouple spark program from insertion into the database. Due to this spark application running on one of the workers may not synchronize with others, as all the data is needed for its part of the output is fully present in its input. This is achieved by the additional step in data production, namely partitioner. This application alongside exploding the messages from the api to contain only one trade per message also partitions all trades based on the stock symbol. To be precise, it writes trades into the kafka topic with a key equals the stock symbol. This ensures all trades of the same stock will be in the same partition of the kafka topic, therefore all trades for this specific stock will be processed by exactly one worker. As the output produced per stock, there is no need for synchronization between workers, as only one of them contains all the needed information.

One more crucial part to mention: the frequency of the reports. I will discuss hourly reports, the same is applicable logic for per-minute reports. As the reports should be accessible in the hour after the respective hour end, therefore the frequency should not be more than 1 hour, otherwise it would be possible to retrieve the incomplete report. For example, if the reports are sent once in 1 hour 10 minutes, one can be sent at 12:55 containing the final report for 11:00 - 12:00 and partially for 12:00 - 13:00 (as the data is still incomplete for this hour). The next report will be sent at 14:05, so request at 14:03 would show inaccurate data about 12:00 - 13:00 as the final report has not arrived yet. From the other side, such reports have no need to arrive too often, as they are not needed, so to have as fewer messages to process as possible, such reports should be sent rarely. Another important factor is processing time and existence of late data. All this can lead to delays in receiving the accurate data. We decided to give a margin of 20 minutes to both these factors combined, so it resulted in 40 minutes period.

Caching on the Backend

Since our precomputed data have a period of 1 hour, it would be suboptimal to query the data from Redis each time the users use the endpoint. Thus, for the queries of category A, the data from Redis is retrieved only once per hour and cached in memory in a fairly simple way as a variable on the backend. The simplified procedure for caching * REDDIT_DATA* is as follows:

cached_hourly_stats: Tuple[int, Dict] = (0, {})

current_hour = int(datetime.now().timestamp() / 3600)
if cached_hourly_stats[0] != current_hour:
    cached_hourly_stats = (current_hour, REDDIS_DATA)
return cached_hourly_stats[1]

🖥 Usage

First of all in docker-compose.yaml for websocket_to_kafka service you should specify your Finnhub api key as an environmental variable. Now to run everything it is enough to rum docker compose up -d in the root directory of this project. The api is accessible on the port 8080.

Results

All the hour and minutes ids in our project are its ordinal number since epoch start. They were retrieved as round down of the fraction of the number of seconds since epoch (unix timestamp) and number of seconds in hour / minute (3600 / 60).

Precomputed queries

We have started the application at 19:20 on Friday UTC, when the trading stops at 20:00, with four leading hours trading conducted only online. Thus, the volumes for consecutive hours dropped significantly and in four hours the trading stopped completely. Overall, the application ran approximately 7 hours, and the results of the precomputed queries are the following ones:

  • Number of transactions for each stock for each hour in the last 6 hours, excluding the previous hour
    Endpoint: GET /transactions/hour
// http://localhost:8080/transactions/hour/

{
  "transactions": [
    {
      "hour": 476831,
      "data": {
        "NVDA": 5925,
        "IBM": 18,
        "INTC": 239,
        "GOOG": 109,
        "AMD": 840,
        "NFLX": 119,
        "AAPL": 582
      },
      "datetime": "2024-05-24 23:00:00"
    },
    {
      "hour": 476830,
      "data": {
        "AMD": 856,
        "GOOG": 131,
        "AAPL": 706,
        "NVDA": 5919,
        "INTC": 217,
        "NFLX": 103,
        "IBM": 18
      },
      "datetime": "2024-05-24 22:00:00"
    },
    {
      "hour": 476829,
      "data": {
        "NFLX": 106,
        "IBM": 25,
        "AMD": 906,
        "INTC": 340,
        "AAPL": 781,
        "NVDA": 5041,
        "GOOG": 233
      },
      "datetime": "2024-05-24 21:00:00"
    },
    {
      "hour": 476828,
      "data": {
        "AMD": 1748,
        "IBM": 134,
        "AAPL": 1446,
        "NFLX": 420,
        "INTC": 796,
        "GOOG": 542,
        "NVDA": 13834
      },
      "datetime": "2024-05-24 20:00:00"
    },
    {
      "hour": 476827,
      "data": {
        "NVDA": 107003,
        "IBM": 9590,
        "AMD": 43212,
        "AAPL": 45477,
        "NFLX": 12609,
        "GOOG": 22091,
        "INTC": 22079
      },
      "datetime": "2024-05-24 19:00:00"
    }
  ],
  "missing_hours": [
    476832,
    476826
  ]
}
  • Total trading volume for each stock for the last 6 hours, excluding the previous hour
    Endpoint: GET /volumes/hour
// http://localhost:8080/volumes/hour/

{
  "transactions": [
    {
      "hour": 476831,
      "data": {
        "NVDA": 168107.0,
        "IBM": 374774.0,
        "INTC": 10476.0,
        "GOOG": 1582.0,
        "AMD": 47458.0,
        "NFLX": 1678.0,
        "AAPL": 14280.0
      },
      "datetime": "2024-05-24 23:00:00"
    },
    {
      "hour": 476830,
      "data": {
        "AMD": 61293.0,
        "GOOG": 1743.0,
        "AAPL": 20392.0,
        "NVDA": 168147.0,
        "INTC": 17611.0,
        "NFLX": 474.0,
        "IBM": 374677.0
      },
      "datetime": "2024-05-24 22:00:00"
    },
    {
      "hour": 476829,
      "data": {
        "NFLX": 404.0,
        "IBM": 1160.0,
        "AMD": 78926.0,
        "INTC": 19938.0,
        "AAPL": 17873.0,
        "NVDA": 187937.0,
        "GOOG": 8673.0
      },
      "datetime": "2024-05-24 21:00:00"
    },
    {
      "hour": 476828,
      "data": {
        "AMD": 5756245.0,
        "IBM": 1407319.0,
        "AAPL": 11827159.0,
        "NFLX": 758559.0,
        "INTC": 25270319.0,
        "GOOG": 3269324.0,
        "NVDA": 3938604.0
      },
      "datetime": "2024-05-24 20:00:00"
    },
    {
      "hour": 476827,
      "data": {
        "NVDA": 4984311.0,
        "IBM": 470708.0,
        "AMD": 3981019.0,
        "AAPL": 3776556.0,
        "NFLX": 340102.0,
        "GOOG": 1562814.0,
        "INTC": 3678432.0
      },
      "datetime": "2024-05-24 19:00:00"
    }
  ],
  "missing_hours": [
    476832,
    476826
  ]
}
  • Number of trades and their total volume for each hour in the last 12 hours, excluding the previous hour
    Endpoint: GET /hourly-stats
// http://localhost:8080/hourly-stats/

{
  "transactions": [
    {
      "hour": 476831,
      "data": {
        "total_volume": 618355.0,
        "total_count": 7832
      },
      "datetime": "2024-05-24 23:00:00"
    },
    {
      "hour": 476830,
      "data": {
        "total_volume": 644337.0,
        "total_count": 7950
      },
      "datetime": "2024-05-24 22:00:00"
    },
    {
      "hour": 476829,
      "data": {
        "total_volume": 314911.0,
        "total_count": 7432
      },
      "datetime": "2024-05-24 21:00:00"
    },
    {
      "hour": 476828,
      "data": {
        "total_volume": 52227529.0,
        "total_count": 18920
      },
      "datetime": "2024-05-24 20:00:00"
    },
    {
      "hour": 476827,
      "data": {
        "total_volume": 18793942.0,
        "total_count": 262061
      },
      "datetime": "2024-05-24 19:00:00"
    }
  ],
  "missing_hours": [
    476832,
    476826,
    476825,
    476824,
    476823,
    476822,
    476821,
    476820
  ]
}

The remarkable thing about all these data is the list at the end of the response - missing_hours. This list contains all hours' identifiers that should be included in the response, but the data was not present. In these reports you can see that the data for the last hour (476832) is missing, and all hours before start of the application (476826 and smaller). The data about the last hour is missing because all trades stopped at that time, so there was no data to aggregate.

Ad-Hoc queries

All the requests for Ad-hoc queries were retrieved much earlier than for precomputed queries when trades still were conducted to have at least some information. The results were taken during the online session so the numbers are not so big.

  • Number of trades processed in a specific stock in the last N minutes, excluding the last minute
    Endpoint: GET /transactions/minutes/{stock_symbol}?N=<int>

IBM last ten minutes volume

NVIDIA last ten minutes volume

  • Top N stock with the highest trading volume in the last hour
    Endpoint: GET /volumes/hourly-top?N=<int>

Top N stocks

  • Stock current price based on its symbol
    Endpoint: GET /{stock_symbol}

Current price of the Apple

About

Final Project for Spring 24' Big Data Processing course

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published