Network observability with Elasticsearch on GCP
17 Apr 2023 by dzlabOn a previous Article, we discussed the need for setting up a centeralized log management platform to debug network issues.
In the remaining of this article, we will briefly describe the architecture and then deep dive into deploying it on GCP using Cloud Deployment Manager.
Overview of the architecture
The above diagram illustrates a high level solution on how to build a network observability platform with Elasticsearch on GCP.
Logs from VPC Flow logs are batched into files and then uploaded to a Cloud Storage bucket. Every time, a file is uploaded an new entry is appended to PubSub queue with information about the file. This triggers a Cloud Function that will process and ingest the logs into Elasticsearch. Once the logs reach Elasticsearch, they can be retrived though Kibana or used to populate a custom dashboard.
In addition to VPC Flow logs, other sources of logs can be integrated into this architecture to debug other type of issues (e.g. permission errors) for instance Cloud Audit Logs and Cloud CDN logs in batch mode.
In some cases, errors are encountered inside the Cloud Function during the processing of log files. In such cases, the Cloud Function can forward the original Cloud Storage event to a Dead-Letter Queue (DLQ), which will cause the messages with failures to be sent back to the main queue for reprocessing.
Deploying with Cloud Deployment Manager
Elasticsearch cluster can be deployed from GCP Marketplace while other GCP resources like the VPC Flow Logs, the Cloud Storage buckets, the Cloud Function, and the PubSub queues can be deployed with Cloud Deployment Manager as follows:
First, configure the different resources as follows
resources:
# Cloud Storage bucket where log files will be uploaded
- name: logs-bucket
type: storage.v1.bucket
properties:
name: logs-bucket
location: us-central1
storageClass: STANDARD
# PubSub queue for notifications when log files are uploaded
- name: logs-upload-topic
type: pubsub.v1.topic
properties:
name: logs-upload-topic
# Cloud Function for processing log files and ingesting them
- name: logs-processing-function
type: cloudfunctions.v1.function
properties:
name: logs-processing-function
runtime: nodejs14
trigger_http: true
source_archive_bucket: my-bucket
source_archive_object: my-function.zip
Then, we need to make sure that VPC Flow logs are uploaded to a designated Cloud Storage bucket. For instance, using Cloud Deployment Manager we can do the following:
resources:
- name: my-flow-log
type: compute.v1.flow_log
properties:
name: my-flow-log-name
target_bucket: logs-bucket
filter:
source_ranges:
- 10.0.0.0/8
When a new VPC Flow Log file is uploaded to the Cloud Storage bucket, we want to put an entry in PubSub queue with information about the file such as the file name, the file size, and the file content.
Using gsutil
, we can create such a notification rule by providing the source bucket to be notified when files are uploaded to it, and specifying the destination PubSub queue where notifications will be sent. This is an example configuration:
gsutil notification create -t logs-upload-topic -f json -e OBJECT_FINALIZE gs://logs-bucket
Next, we need to create an event trigger for the Cloud Function that will process log files and ingest them into Elasticsearch. The event trigger should be set to fire when a new entry is added to the PubSub queue. In Cloud Deployment Manager, we can define a subscription as follows:
- name: logs-subscription
type: pubsub.v1.subscription
properties:
name: logs-subscription
topic: logs-upload-topic
topic_path: projects/my-project/topics/logs-upload-topic
push_config:
push_endpoint: https://us-central1-functions.cloudfunctions.net/logs-processing-function
Save the content from all the previous snippets into resources.yaml
, then provision them with
gcloud deployment-manager deployments create logs-deployment --config resources.yaml
Inside the Cloud Function, we need implement the logic to process log files and ingest them to Elasticsearch. The following snippet illustrates a very simplifed version:
// Import the Cloud Storage and Elasticsearch libraries
const {Storage} = require('@google-cloud/storage');
const elasticsearch = require('elasticsearch');
exports.handler = async (event, context) => {
// Create an Elasticsearch client
const es = new elasticsearch.Client({
host: 'elasticsearch.example.com',
port: 9200,
});
// Get the file name from the event
const fileName = event.name;
// Get the file contents
const file = await Storage.bucket(event.bucket).file(fileName).read();
const data = file.Body.toString();
const index = 'logs';
const doc = {
message: data,
};
// Index the file contents into Elasticsearch
await es.index({
index: index,
type: 'doc',
id: key,
document: doc,
}).promise();
// Return a success message
return {
message: `File ${fileName} ingested into Elasticsearch`,
};
};
After everything is deployed, the VPC flow logs will be uploaded to Cloud Storage and then ingested into Elasticsearch. We can verify that the logs are being ingested by querying Elasticsearch. For example, you can use the following command to search for all logs that contain the word "error"
:
curl -XGET 'http://<ELASTICSEARCH_HOST>:<ELASTICSEARCH_PORT>/logs/_search?q=error'
If the logs are being ingested, you should see a response that contains a list of documents that match the search criteria.
Searching the logs
Elasticsearch provides very powerful search capbilities to search and analyze large amounts of data. It has a simple and an extensive search syntax. For instance to retrieve from the logs all REJECTED
network traffic with a source IP from the 10.0.0.0/8 CIDR range and a destination IP, we can use the following query:
{
"query": {
"bool": {
"must": [
{
"match": {
"type": "REJECTED"
}
},
{
"range": {
"source_ip": {
"gte": "10.0.0.0",
"lte": "10.255.255.255"
}
}
},
{
"match": {
"destination_ip": {
"exists": true
}
}
}
]
}
}
}
The above query will first match all documents that have a type of “REJECTED”. It will then match all documents that have a source IP address that falls within the 10.0.0.0/8 CIDR range. And finally, it will match all documents that have a destination IP address that exists.
We can simplify our search query using the Kibana Query Language. Which is a powerful way to search and filter data in Elasticsearch. It is a query language that allows you to specify the criteria that you want to use to search for documents. Kibana search syntax uses a variety of keywords to specify the criteria for your search, like the match
keyword to match specific values. It also allows the use of logical operators (e.g. AND
, OR
) to combine multiple criteria.
Back to our search query, we can simplify it using Kibana query syntax as follows:
source.ip:10.0.0.0/8 AND event.action:rejected AND destination.ip:*
This query will return all rejected network traffic with a source IP from the 10.0.0.0/8 CIDR range and a destination IP.
That’s all folks
I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc.