How to import/write data into ERPNext with python parallelism

Hi Dear Expert
I am trying to import many data from external into ERPnext, and for each record, it will require a little bit business logic to process, query certain configuration settings in ERPNext and write(insert or update) records into ERPNext, but each record can be processed separately without impacting the other.
I am trying the python mutiprocess thread pool but found it doesn’t really work.
It looks like to me that all the frappe related objects are not able to be used in the thread.
Could you please give me some hint how this is going to work? Any other approach to conduct DB query/write in parallel?
Thanks.

from multiprocessing.pool import ThreadPool
import frappe
def getItem(item_name):
doc=frappe.get_doc(‘Item’,item_name)
logger.info(“Succeeded in Multi-Processing:”+threading.current_thread().name+str(doc.valuation_rate))
logger.info(“Succeeded in Multi-Processing:”+str(doc.valuation_rate))

def run_multi_threads():
pool_size=10
pool=ThreadPool(pool_size)
items=[‘XYZ’,‘ABC’]
pool.map(getItem,items)

The exception shows:
Traceback (most recent call last):
File “/Documents/Development/ERP/deploy/dev/env/lib/python3.11/site-packages/rq/worker.py”, line 1428, in perform_job
rv = job.perform()
^^^^^^^^^^^^^
File “/dev/env/lib/python3.11/site-packages/rq/job.py”, line 1278, in perform
self._result = self._execute()
^^^^^^^^^^^^^^^
File “/dev/env/lib/python3.11/site-packages/rq/job.py”, line 1315, in _execute
result = self.func(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/dev/apps/frappe/frappe/utils/background_jobs.py”, line 206, in execute_job
retval = method(**kwargs)
^^^^^^^^^^^^^^^^
File “/dev/apps/test/test/utils/test_multi_threads.py”, line 22, in run_multi_threads
pool.map(getItem,items)
File “/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py”, line 367, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py”, line 774, in get
raise self._value
File “/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py”, line 125, in worker
result = (True, func(*args, **kwds))
^^^^^^^^^^^^^^^^^^^
File “/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py”, line 48, in mapstar
return list(map(*args))
^^^^^^^^^^^^^^^^
File “/deploy/dev/apps/test/test/utils/test_multi_threads.py”, line 13, in getItem
doc=frappe.get_doc(‘Item’,item_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/dev/apps/frappe/frappe/init.py”, line 1255, in get_doc
doc = frappe.model.document.get_doc(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/dev/apps/frappe/frappe/model/document.py”, line 80, in get_doc
controller = get_controller(doctype)
^^^^^^^^^^^^^^^^^^^^^^^
File “/dev/apps/frappe/frappe/model/base_document.py”, line 64, in get_controller
if frappe.local.dev_server or frappe.flags.in_migrate:
^^^^^^^^^^^^^^^^^^^^^^^
File “/dev/env/lib/python3.11/site-packages/werkzeug/local.py”, line 86, in getattr
raise AttributeError(name)
AttributeError: dev_server

I’ve never tried using Frappe/ERPNext with multiprocess thread pools.

My suggestion is solving this by using the Background Workers and Queues.
https://frappeframework.com/docs/user/en/api/background_jobs

The idea is this:

  • You can have as many Queues as you want in Frappe Framework. By default, you get 'short', 'default' and 'long' queues.
  • You can assign multiple Workers to each Queue. By doing so, you can achieve concurrency/parallelism.
  • You can enqueue any function (with any arguments you want) using the frappe.enqueue() function.

So, you might write some code like this:

  1. Create a function that can process a single record.
def load_my_record(some_dictionary_object: dict):
    """
    Do whatever you need to with your data.
    """
  1. Next, loop through every row in your data import file/table.
  2. For “each_row” in your loop, add your function to a queue:
frappe.enqueue(
    method="path_to_my_app.path_to_my_module.load_my_record",
    some_dictionary_object=each_row,
    queue="long"
)

Your records will now be imported/written in parallel. The more Workers assigned to that queue, the more parallel operations.

3 Likes

OK, Thanks a lot, Brian, will try and come back!