Google PubSub: Number of Unread Messages

RMAG news

This week, I explored different techniques for handling long-running tasks, such as calls to Language Learning Models (LLMs), and decided to introduce a queue to better manage our requests. To achieve this, I deployed a Google PubSub Topic, which helps in efficiently managing and processing our requests. However, we also wanted to provide our users with an estimate of how long it would take before their task is completed. To accomplish this, I needed to determine the approximate number of unread messages in the topic.
After examining the Google PubSub API, I discovered that it doesn’t provide direct access to the number of unread messages. However, I found an alternative solution using Google Metric Explorer. By leveraging the metrics available through this tool, I was able to retrieve the necessary data to estimate the number of unread messages in the PubSub topic. In the following section, I will share some TypeScript code that demonstrates how this works.

The first thing you need to do is install the google packages

npm install @google-cloud/monitoring @google-cloud/pubsub

The key metric you are looking for is number of undelivered messages. Google Metric Explorer has an UI that you can build to run queries on different parts of your infrastructure. The following is a MQL that I used to query one of my topic to get the unread messages

fetch pubsub_subscription
| metric ‘pubsub.googleapis.com/subscription/num_undelivered_messages’
| filter
(resource.resource.subscription_id == ‘${subscriptionName}’)
| group_by 1m,
[value_num_undelivered_messages_mean: mean(value.num_undelivered_messages)]
| every 1m
| group_by [],
[value_num_undelivered_messages_mean_aggregate:
aggregate(value_num_undelivered_messages_mean)]

The following is the code you can use to connect to run the above query to get the number of unread messages for you topic

import monitoring from ‘@google-cloud/monitoring’;

async getUnackedMessages(topicId: string, projectId:string, region: string): Promise<number> {

const monitoringClient = new monitoring.QueryServiceClient();

try {
const queryRequest = {
name: `projects/${projectId}`,
query: ‘QUERY FROM ABOVE’,
};

const [response] = await monitoringClient.queryTimeSeries(queryRequest);

if (
response.length > 0 &&
) {
return Number(response[0].pointData[0].values[0].doubleValue);
} else {
return 0; //Default condition
}
} catch (error) {
console.error(‘Error fetching unacked messages:’, error);
throw error;
}
}

The above code works well; however, there are a couple of things you need to keep in mind. Firstly, this is not a real-time API, so if you are adding messages to your topics, the changes will not be immediately reflected. I believe this has to do with how PubSub pushes messages to its subscribers. Lastly, I occasionally noticed that sometimes when calling this API, I would get an empty array back instead of the expected responses.

If you have any comments, questions, or insights regarding this approach, please feel free to reach out to me. I’m always eager to learn from the community and discuss ways to improve and optimize our solutions.