Combining vision and language usually leads to better performance whatever is the task. In this article, we will see how we can leaverage a pre-trained model to search videos by activity.

We will use a model called Multiple Instance Learning (MIL) and Noise Contrastive Estimation (NCE) or simply MIL-NCE from TensorFlow Hub. This model was trained on the HowTo100M dataset which is a large-scale dataset of narrated 136M video clips where content creators explain the tasks being performed in the video.

This model can generate embeddings for video and text, we will use this capability to perform retrieval of the best video matching a query by calculating the distance between the two embeddings.

To learn more about the model and the general task it was trained for check the original paper here - arxiv.org

image.png

Let's start but getting the dependencies

import math
import uuid
import cv2
import numpy as np
from pathlib import Path
import tensorflow as tf
import tensorflow_hub as tfhub
from google.colab.patches import cv2_imshow
from IPython.display import Image

We need a function to download a video for a given url, store it locally and give it a random file name, we will use this later to get the test videos

def fetch_video(video_url):
    extension = Path(video_url).suffix
    file_name = str(uuid.uuid4()) + extension
    path = tf.keras.utils.get_file(file_name, video_url, cache_dir='.', cache_subdir='.')
    return path

Next, we define a function that will crop the frames to a square selected to be in the middle of the frame:

def get_center_square_coordinates(height, width):
    dimension = min(width, height)
    x_start = (width // 2) - (dimension // 2)
    x_end = x_start + dimension
    y_start = (height // 2) - (dimension // 2)
    y_end = y_start + dimension
    return x_start, y_start, x_end, y_end

def crop_center(in_frame):
    height, width = in_frame.shape[:2]
    x_start, y_start, x_end, y_end = get_center_square_coordinates(height, width)
    out_frame = in_frame[y_start:y_end, x_start:x_end]
    return out_frame

Next, we define a set of helper functions to read a video file and extract at most 32 frames (if video has fewer frames we repeat some)

def extract_frames(video_path, max_frames, resize=(224, 224)):
    """Extract at most max_frames from the video"""
    capture = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) <= max_frames:
        frame_read, frame = capture.read()
        if not frame_read:
            break
        frame = crop_center(frame)
        frame = cv2.resize(frame, resize)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)

    capture.release()
    return np.array(frames)

def repeat_frames(in_frames, max_frames):
    """Repeat frames until reaching a length of max_frames"""
    if len(in_frames) >= max_frames:
        return in_frames
    repetitions = math.ceil(float(max_frames) / len(in_frames))
    repetitions = int(repetitions)
    out_frames = in_frames.repeat(repetitions, axis=0)
    return out_frames

def read_video(video_path, max_frames=32, resize=(224, 224)):
    # read frame and extract its frames
    frames = extract_frames(video_path, max_frames, resize)
    # make sure we have max_frames
    frames = repeat_frames(frames, max_frames)
    # select only max_frames
    frames = frames[:max_frames]
    return frames / 255.0

For the test, we will use random GIF files

URLS = [
'https://media.giphy.com/media/Wrm9ZTb7LFMcESBA4i/giphy.gif',
'https://media.giphy.com/media/Bom5hTsAsI8sFnH68k/giphy.gif',
'https://media.giphy.com/media/2aLiVCqTZmxwXeRfMh/giphy.gif',
'https://media.giphy.com/media/ngzhAbaGP1ovS/giphy.gif']
Image(url=URLS[-1])

Use the previsous helper functions to download the videos and extract the frames

VIDEOS = [read_video(fetch_video(url)) for url in URLS]
Downloading data from https://media.giphy.com/media/Wrm9ZTb7LFMcESBA4i/giphy.gif
7331840/7331587 [==============================] - 0s 0us/step
7340032/7331587 [==============================] - 0s 0us/step
Downloading data from https://media.giphy.com/media/Bom5hTsAsI8sFnH68k/giphy.gif
5185536/5181685 [==============================] - 0s 0us/step
5193728/5181685 [==============================] - 0s 0us/step
Downloading data from https://media.giphy.com/media/2aLiVCqTZmxwXeRfMh/giphy.gif
23699456/23697427 [==============================] - 0s 0us/step
23707648/23697427 [==============================] - 0s 0us/step
Downloading data from https://media.giphy.com/media/ngzhAbaGP1ovS/giphy.gif
1130496/1126589 [==============================] - 0s 0us/step
1138688/1126589 [==============================] - 0s 0us/step

For the search test we define those random queries

QUERIES = ['biking', 'launching', 'skiing', 'skateboarding']

We load the pre-trained model and its weights from TF Hub which is available at https://tfhub.dev/deepmind/mil-nce/s3d/1

model = tfhub.load('https://tfhub.dev/deepmind/mil-nce/s3d/1')

Next we define a helper function that will use the model to calcualte the embeddings of the input video

def get_video_embeddings(model, input_frames):
    frames = tf.cast(input_frames, dtype=tf.float32)
    frames = tf.constant(frames)
    video_model = model.signatures['video']
    video_embedding = video_model(frames)
    video_embedding = video_embedding['video_embedding']
    return video_embedding

Similarly, we also define a helper function that will use the model to calcualte the embeddings of the input text

def get_text_embeddings(model, input_words):
    words = tf.constant(input_words)
    text_model = model.signatures['text']
    text_embedding = text_model(words)
    text_embedding = text_embedding['text_embedding']
    return text_embedding

Now we calculate the embeddings for the text and video frames

video_emb = get_video_embeddings(model, np.stack(VIDEOS, axis=0))
text_emb = get_text_embeddings(model, np.array(QUERIES))

We combine both embeddings to calculate a similarity scores that represents the distance between the two embeddings:

scores = np.dot(text_emb, tf.transpose(video_emb))

To display the search result we need a radom frame that will represent each video, for instance the we can take the first frame

def get_one_frame(video):
    return video[0]

def process_frame(frame):
    return cv2.cvtColor((frame * 255.0).astype('uint8'), cv2.COLOR_RGB2BGR)

first_frames = [process_frame(get_one_frame(v)) for v in VIDEOS]

This is a helper function that we will use to annotate the frmes with their respective distance score to the query

def annotate_frame(in_frame, text):
    out_frame = in_frame.copy()
    cv2.putText(out_frame, text, (8, 15), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=0.6, color=(255, 255, 255), thickness=2)
    return out_frame

Finally, we can perform the search. In this case we take the score of the first query biking and sort the frames based on their distance to the embeddings of the text (the higher the closer) and we annotate the frames with their position in the result and the calculated score.

query_scores = scores[0]

sorted_results = sorted(list(zip(first_frames, query_scores)), key=lambda p: p[-1], reverse=True)
annotated_frames = []
for i, (f, s) in enumerate(sorted_results, start=1):
    frame = annotate_frame(f.copy(), f'#{i} - Score: {s:.2f}')
    annotated_frames.append(frame)

cv2_imshow(np.hstack(annotated_frames))

Notice how well the model was able to choose the moutain biking video as the best match for the biking query

We can do this again with another query, for instance skateboarding

query_scores = scores[-1]

sorted_results = sorted(list(zip(first_frames, query_scores)), key=lambda p: p[-1], reverse=True)
annotated_frames = []
for i, (f, s) in enumerate(sorted_results, start=1):
    frame = annotate_frame(f.copy(), f'#{i} - Score: {s:.2f}')
    annotated_frames.append(frame)

cv2_imshow(np.hstack(annotated_frames))

Notice how this time the model did not pick the dog video as we would expected. But also the three first videos have a very close score to the query nevertheless.

I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc