design patterns - python pool apply_async and map_async do not block on full queue -
i new python. using multiprocessing module reading lines of text on stdin, converting them in way , writing them database. here's snippet of code:
batch = [] pool = multiprocessing.pool(20) = 0 i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: pool.apply_async(insert, args=(batch,i+1)) batch = [] pool.apply_async(insert, args=(batch,i)) pool.close() pool.join()
now works fine, until process huge input files (hundreds of millions of lines) pipe python program. @ point, when database gets slower, see memory getting full.
after playing, turned out pool.apply_async pool.map_async never ever block, queue of calls processed grows bigger , bigger.
what correct approach problem? expect parameter can set, block pool.apply_async call, queue length has been reached. afair in java 1 can give threadpoolexecutor blockingqueue fixed length purpose.
thanks!
just in case 1 ends here, how solved problem: stopped using multiprocessing.pool. here how now:
#set amount of concurrent processes insert db data processes = multiprocessing.cpu_count() * 2 #setup batch queue queue = multiprocessing.queue(processes * 2) #start processes _ in range(processes): multiprocessing.process(target=insert, args=(queue,)).start() #fill queue batches batch=[] i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: queue.put((batch,i+1)) batch = [] if batch: queue.put((batch,i+1)) #stop processes using poison-pill _ in range(processes): queue.put((none,none)) print "all done."
in insert method processing of each batch wrapped in loop pulls queue until receives poison pill:
while true: batch, end = queue.get() if not batch , not end: return #poison pill! complete! [process batch] print 'worker done.'
Comments
Post a Comment