Python - Multithreading
When to use which
see [1]
use threading if your program is I/O/network limited
use multiprocessing if it's CPU limited. (here the entire memory is copied into each subprocess)
Example for both
import multiprocessing import threading import queue import os import time import math def worker_core(i: int, s: str) -> list: """ used in worker_core_cpu and worker_core_io """ result = i, s, os.getpid() time.sleep(.1) return result def worker_core_cpu(i: int, s: str) -> list: """ used in all CPU limited worker implementations """ sum = 0 for j in range(1_000): sum += math.sqrt(i+j) result = worker_core(i=i, s=s) return result def worker_core_io(i: int, s: str) -> list: """ used in all I/O limited worker implementations """ time.sleep(.1) result = worker_core(i=i, s=s) return result def worker_multiprocessing_1(i: int, s: str) -> list: " used in multiprocessing_1 " return worker_core_cpu(i=i, s=s) def worker_multiprocessing_2(q_work: multiprocessing.Queue, q_results: multiprocessing.Queue): " used in multiprocessing_2 -> not faster than multiprocessing_1" while not q_work.empty(): i, s = q_work.get() result = worker_core_cpu(i=i, s=s) q_results.put(result) def worker_threading_1(q_work: queue.Queue, results: dict): "used in threading_1" while not q_work.empty(): i, s = q_work.get() result = worker_core_io(i=i, s=s) results[i] = result q_work.task_done() def multiprocessing_1(l_pile_of_work: list): """ for CPU limited tasks """ num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work)) pool = multiprocessing.Pool(processes=num_processes) # use map for 1 argument, use starmap for multiple arguments for worker function # here each item in l_pile_of_work is a list of 3 elements -> starmap l_results_unsorted = pool.starmap( func=worker_multiprocessing_1, iterable=l_pile_of_work) l_results = sorted(l_results_unsorted) # sort by id return l_results def multiprocessing_2(l_pile_of_work: list): """ for CPU limited tasks not using not re-creation of processes, but fixed number instead result: slower than multiprocessing_1 """ q_pile_of_work = multiprocessing.Queue(maxsize=len(l_pile_of_work)) q_results = multiprocessing.Queue(maxsize=len(l_pile_of_work)) for params in l_pile_of_work: q_pile_of_work.put(params) del params num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work)) l_processes = [] for i in range(num_processes): p = multiprocessing.Process( name='myProcess-'+str(i), target=worker_multiprocessing_2, args=(q_pile_of_work, q_results), daemon=True) l_processes.append(p) del i for p in l_processes: p.start() print("started") while not q_results.full(): time.sleep(.1) for p in l_processes: p.terminate() p.join() p.close() print("closed") del p l_results_unsorted = [] while not q_results.empty(): result = q_results.get() l_results_unsorted.append(result) l_results = sorted(l_results_unsorted) # sort by id return l_results def threading_1(l_pile_of_work: list, num_threads: int): """ for I/O limited tasks """ q_pile_of_work = queue.Queue( maxsize=len(l_pile_of_work)) # maxsize=0 -> unlimited for params in l_pile_of_work: q_pile_of_work.put(params) d_results = {} # threads can write into dict l_threads = [] # List of threads, not used here for i in range(num_threads): t = threading.Thread(name='myThread-'+str(i), target=worker_threading_1, args=(q_pile_of_work, d_results), daemon=True) l_threads.append(t) t.start() q_pile_of_work.join() # wait for all threas to complete l_results_unsorted = d_results.values() l_results = sorted(l_results_unsorted) # sort by id return l_results if __name__ == '__main__': l_pile_of_work = [] loops = 1_000 for i in range(loops): # use index as first parameter L2 = (i, "n"+str(i)) l_pile_of_work.append((L2)) del L2, i time_start = time.time() # results = multiprocessing_1(l_pile_of_work) # # or # results = multiprocessing_2(l_pile_of_work) # # or results = threading_1(l_pile_of_work, num_threads=100) duration = time.time() - time_start print("%d sec = %.1f min" % (duration, duration/60)) # for res in results: # print('task %d, name %s was done in process %d' % (res)) # print(len(results))
Threading V2: 18.04.2020
see [2]
import multiprocessing as mp # for fetching number of CPUs import threading import concurrent.futures def series_of_fits_multi_threading(data: list, fit_range: int = 7, max_days_past=14) -> list: fit_series_res = {} l_last_days_for_fit = range(0, -max_days_past, -1) with concurrent.futures.ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor: # Start the load operations and mark each future with its data set list_future = {executor.submit( series_of_fits_worker_thread, data, fit_range, last_day_for_fit): last_day_for_fit for last_day_for_fit in l_last_days_for_fit} for future in concurrent.futures.as_completed(list_future): last_day_for_fit = list_future[future] d_this_fit_result = {} try: d_this_fit_result = future.result() except Exception as exc: print('%r generated an exception: %s' % (day, exc)) if len(d_this_fit_result) != 0: fit_series_res[last_day_for_fit] = round( d_this_fit_result['fit_res'][1], 1) return fit_series_res def series_of_fits_worker_thread(data: list, fit_range: int, last_day_for_fit: int): # print(threading.currentThread().getName(), 'Starting') d = fit_routine( data, (last_day_for_fit-fit_range, last_day_for_fit)) return d
V1
import threading import logging import time import multiprocessing as mp # for fetching number of CPUs # Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe # as a side-effect of having atomic byte-codes for manipulating them # -> so locking is them is not required. # Other data structures implemented in Python, or simpler types like integers and floats, don’t have that protection. # from inside a thread it is possible to manipulate dic but not integers # Logging is nicer than print, as it can automatically add the threadname logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s: %(message)s', ) # List of "work" to to listPileOfWork = [x for x in range(7)] # Threads can not write in integers, but in dictionaries ! # counter = 0 dic = {} dic["Counter"] = 0 def worker(): logging.info('Starting') # print(threading.currentThread().getName(), 'Starting') while listPileOfWork : x = listPileOfWork.pop(0) logging.info("Took" + str(x)) dic["Counter"] += x time.sleep(2) logging.info('No more work to do') threads = [] # List of threads for i in range(mp.cpu_count()): t = threading.Thread(name='myThread'+str(i), target=worker) threads.append(t) t.start() # Wait for all threads to complete for t in threads: t.join() logging.info('All threads done')
Multiprocessing
see [3]
V2
import multiprocessing from os import getpid import time def worker(i, s): print(i) # print('I am number %d, %s in process %d' % (i, str, getpid())) time.sleep(.1) return i, s, getpid() if __name__ == '__main__': pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) L1 = [] for i in range(100): L2 = (i, "n"+str(i)) L1.append(L2) # map for 1 argument, starmap for multiple arguments at worker results_unsorted = pool.starmap(worker, L1) results = sorted(results_unsorted) for res in results: print('task %d, name %s was done in process %d' % (res))
V1
From [4]: The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for. Spawning processes is a bit slower than spawning threads. Once they are running, there is not much difference.
Example from [5]
Prob: KeyboardInterrupt (CTRL+C) does not close program completely see [6] as starting point for a solution
from multiprocessing import Pool def escapes(cr, ci, it): """Does iterating z <- z^2 + c escape after it iterations?""" zr = 0.0 zi = 0.0 for i in xrange(it): # z <- z^2 + c zr,zi = zr*zr - zi*zi + cr, 2*zr*zi + ci if zr*zr + zi*zi > 4: return True return False def toChar(p): if p: return " " else: return "X" def doRow((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc)): """Calculate one row of the output.""" y = yc*(ymax-ymin)/ystep + ymin row = [] for xc in xrange(xstep): x = xc*(xmax-xmin)/xstep + xmin row.append( escapes(x, y, iterations) ) return "".join([toChar(p) for p in row]) def mandel(xmin,xmax,xstep, ymin,ymax,ystep, iterations): """Calculate and print a Mandelbrot set.""" pool = Pool() # askes the os for num of cpus ;-) results = [] for yc in xrange(ystep): res = pool.apply_async(doRow, ((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc),)) results.append(res) for yc in xrange(ystep): print results[yc].get() if __name__=="__main__": mandel(-2.0, 1.0, 80, -1.0, 1.0, 24, 20000)