Ingesting Stocks historical data into Elasticsearch


For those seeking to gain a deeper understanding of market trends, economic fluctuations, and consumer behavior, historical stock data is a treasure trove of insights waiting to be unearthed.

In this blog post, we’ll explore how to ingest historical stock data into Elasticsearch. From data preparation to indexing, we’ll delve into the steps required to harness the power of historical data and supercharge your analytics engine with actionable insights.

Infrastructure setup

In this section, we’ll dive into the different components of our architecture (Elasticsearch, Kibana, and our custom ingestion application), and how to brings them together in containerized environment using Docker-Compose.

The directory structure of the application and the different files needed for the setup is as follows:

├── docker-compose.yml
└── ingestr
    ├── Dockerfile
    ├── main.py
    ├── mappings.json
    └── requirements.txt

The following docker-compose.yml configuration file defines the relationships between the different services and networks:

version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.16.3
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true
      - ELASTIC_PASSWORD=yourpassword
    ports:
      - "9200:9200"
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:7.16.3
    container_name: kibana
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=elastic
      - ELASTICSEARCH_PASSWORD=yourpassword
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
    networks:
      - elastic

  ingestr:
    container_name: ingestr
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    build:
      context: ./ingestr
      dockerfile: Dockerfile
    depends_on:
      - elasticsearch
    networks:
      - elastic

networks:
  elastic:
    driver: bridge

Ingestion application

Containerization

The following Dockerfile is used to build a container for running the ingestion application written in Python. Here’s a step-by-step breakdown of what it does:

When you build this Dockerfile, it will create a container that can be started with the command docker run -it <image_name>, where <image_name> is the name given to the resulting image when building the Dockerfile.

FROM python

RUN pip install --upgrade pip

RUN useradd -ms /bin/bash worker
USER worker
WORKDIR /home/worker

ENV PATH="/home/worker/.local/bin:${PATH}"

COPY --chown=worker:worker requirements.txt requirements.txt
RUN pip install --user -r requirements.txt

COPY --chown=worker:worker . .


CMD ["python", "main.py"]

The depdencies of the application are defined in the requirements.txt file:

requests_html
lxml_html_clean
yahoo_fin
elasticsearch[async]

Application logic

The following Python code snippet from the main.py file implements the ingestion application that feeds historical stock data from Yahoo Finance into Elasticsearch. The code is organized around the following business functionalities:

import asyncio
from elasticsearch import AsyncElasticsearch
import json
import yahoo_fin.stock_info as si
import os

#-------------------------------------------
# Yahoo Finance Data
#-------------------------------------------
def get_historical_data():
  dow_list = si.tickers_dow()
  print(f"Tickers in Dow Jones ({len(dow_list)}): {dow_list}")
  dow_historical = []
  for ticker in dow_list:
    dow_historical.append(si.get_data(ticker))
  return dow_historical

#-------------------------------------------
# Elastic Functions
#-------------------------------------------
async def create_index(es, name, mappings):
  if not await es.indices.exists(index=name):
    await es.indices.create(index=name, mappings=mappings)
    print(f"Index created: {name}")
  else:
    print(f"Index exists: {name}")


async def ingest_data(es, index_name):
  parsed_data = get_historical_data()
  await es.index(index=index_name, document=parsed_data)

#-------------------------------------------
# Main Function
#-------------------------------------------
async def main():
  # Connect to Elasticsearch
  es_url = os.environ.get('ELASTICSEARCH_URL')
  es = AsyncElasticsearch(hosts=[es_url])

  # Load mappings
  with open('mappings.json', 'r') as f:
    mappings = '\n'.join(f.readlines())
    mappings = json.loads(mappings)
  
  # Create index
  await create_index(es, "stocks-index", mappings)
  # Ingest data
  await ingest_data_callback(es, "stocks-index")

#-------------------------------------------
# Run Main Function
#-------------------------------------------
try:
  asyncio.run(main())
except KeyboardInterrupt:
  print('keyboard interrupt, bye')
  pass

Indexing stocks data

The mappings.json JSON file defines the mapping definition for the stocks index. It specifies the structure of the documents with six fields: ticker, date, open, close, adjclose, high, low, and volume.

{
  "properties": {
    "ticker": {
        "type": "keyword"
    },
    "date": {
        "type": "date"
    },
    "open": {
        "type": "float"
    },
    "close": {
        "type": "float"
    },
    "adjclose": {
        "type": "float"
    },
    "high": {
        "type": "float"
    },
    "low": {
        "type": "float"
    },
    "volume": {
        "type": "integer"
    }
  }
}

That’s all folks

I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc.