design patterns - python pool apply_async and map_async do not block on full queue -


i new python. using multiprocessing module reading lines of text on stdin, converting them in way , writing them database. here's snippet of code:

batch = [] pool = multiprocessing.pool(20) = 0 i, content in enumerate(sys.stdin):     batch.append(content)     if len(batch) >= 10000:         pool.apply_async(insert, args=(batch,i+1))         batch = [] pool.apply_async(insert, args=(batch,i)) pool.close() pool.join() 

now works fine, until process huge input files (hundreds of millions of lines) pipe python program. @ point, when database gets slower, see memory getting full.

after playing, turned out pool.apply_async pool.map_async never ever block, queue of calls processed grows bigger , bigger.

what correct approach problem? expect parameter can set, block pool.apply_async call, queue length has been reached. afair in java 1 can give threadpoolexecutor blockingqueue fixed length purpose.

thanks!

just in case 1 ends here, how solved problem: stopped using multiprocessing.pool. here how now:

#set amount of concurrent processes insert db data processes = multiprocessing.cpu_count() * 2  #setup batch queue queue = multiprocessing.queue(processes * 2)  #start processes _ in range(processes): multiprocessing.process(target=insert, args=(queue,)).start()   #fill queue batches     batch=[] i, content in enumerate(sys.stdin):     batch.append(content)     if len(batch) >= 10000:         queue.put((batch,i+1))         batch = [] if batch:     queue.put((batch,i+1))  #stop processes using poison-pill _ in range(processes): queue.put((none,none))  print "all done." 

in insert method processing of each batch wrapped in loop pulls queue until receives poison pill:

while true:     batch, end = queue.get()     if not batch , not end: return #poison pill! complete!     [process batch] print 'worker done.' 

Comments

Popular posts from this blog

delphi - How to convert bitmaps to video? -

jasper reports - Fixed header in Excel using JasperReports -

python - ('The SQL contains 0 parameter markers, but 50 parameters were supplied', 'HY000') or TypeError: 'tuple' object is not callable -