Timeout Error in Event Streaming

I have quite a large multi location ERPNext deployment, where each instance of ErpNext needs to be in bi directional sync with HeadOffice instance. I am using Frappe’s event streaming for replication.

Generally it works OK, to the point where event backlog is not more than 1 hour. But if a location has backlog of more than a day where hundreds of transactions are there, I keep getting Timeout Error when I fetch events using pull_from_node function.

Little clarification here, If I simply remotely fetch event update logs using client.post_request. I can get all the events without any trouble. I think the issue is related to get_update_logs_for_consumer which takes forever to process.

Any ideas to fix this issue will be appreciated.

PS: At any given hour, HeadOffice will fetch around 5000 events per hours. I am just curious, is event streaming is suitable for this task??

@frappe.whitelist()
def get_update_logs_for_consumer(event_consumer, doctypes, last_update):
	"""
	Fetches all the UpdateLogs for the consumer
	It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer
	"""

	if isinstance(doctypes, str):
		doctypes = frappe.parse_json(doctypes)

	from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access

	consumer = frappe.get_doc("Event Consumer", event_consumer)
	docs = frappe.get_list(
		doctype="Event Update Log",
		filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)},
		fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
		order_by="creation desc",
		
	)

	result = []
	to_update_history = []
	for d in docs:
		if (d.ref_doctype, d.docname) in to_update_history:
			# will be notified by background jobs
			continue

		if not has_consumer_access(consumer=consumer, update_log=d):
			continue

		if not is_consumer_uptodate(d, consumer):
			to_update_history.append((d.ref_doctype, d.docname))
			# get_unread_update_logs will have the current log
			old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname)
			if old_logs:
				old_logs.reverse()
				result.extend(old_logs)
		else:
			result.append(d)
	
	for d in result:
		mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name)

	result.reverse()
	return result