I have quite a large multi location ERPNext deployment, where each instance of ErpNext needs to be in bi directional sync with HeadOffice instance. I am using Frappe’s event streaming for replication.
Generally it works OK, to the point where event backlog is not more than 1 hour. But if a location has backlog of more than a day where hundreds of transactions are there, I keep getting Timeout Error when I fetch events using pull_from_node function.
Little clarification here, If I simply remotely fetch event update logs using client.post_request. I can get all the events without any trouble. I think the issue is related to get_update_logs_for_consumer
which takes forever to process.
Any ideas to fix this issue will be appreciated.
PS: At any given hour, HeadOffice will fetch around 5000 events per hours. I am just curious, is event streaming is suitable for this task??
@frappe.whitelist()
def get_update_logs_for_consumer(event_consumer, doctypes, last_update):
"""
Fetches all the UpdateLogs for the consumer
It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer
"""
if isinstance(doctypes, str):
doctypes = frappe.parse_json(doctypes)
from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access
consumer = frappe.get_doc("Event Consumer", event_consumer)
docs = frappe.get_list(
doctype="Event Update Log",
filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)},
fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
order_by="creation desc",
)
result = []
to_update_history = []
for d in docs:
if (d.ref_doctype, d.docname) in to_update_history:
# will be notified by background jobs
continue
if not has_consumer_access(consumer=consumer, update_log=d):
continue
if not is_consumer_uptodate(d, consumer):
to_update_history.append((d.ref_doctype, d.docname))
# get_unread_update_logs will have the current log
old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname)
if old_logs:
old_logs.reverse()
result.extend(old_logs)
else:
result.append(d)
for d in result:
mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name)
result.reverse()
return result