Python multiple producer/consumer queue

Below is a simple skeleton for a python queue that is both fed by and consumed by multiple different threads.

We run five threads, each of which populate a queue with random integers.

Lastly, we sleep a couple of seconds between each put, to simulate an ongoing workload.

Our popper thread, which reads items off the queue, will wait for an item in the queue for ten seconds before throwing an Queue Empty exception. We also print the thread name to prove each thread is doing its fair share of the workload.
It will then loop until all the pusher threads have exited (as defined by our running count variable).

import Queue, random, time, thread, sys
from threading import Thread

plock = thread.allocate_lock()
rlock = thread.allocate_lock()

q = Queue.Queue()
running = 0

#-----------------------------------------

class pusher(Thread):
  def __init__(self):
    global q
    self.num = i
    Thread.__init__(self)
    self.q = q
  def run(self):
    global running
    rlock.acquire()
    running = running + 1
    rlock.release()
    for i in range(3):
      l = random.randint(1,100)
      plock.acquire()
      print "pusher " + str(l) + " " + self.getName()
      plock.release()
      self.q.put(l);
      time.sleep(2)
    rlock.acquire()
    running = running - 1
    rlock.release()

#-----------------------------------------

class popper(Thread):
  def __init__(self):
    global q
    self.num = i
    Thread.__init__(self)
    self.q = q
  def run(self):
    global running
    while running > 0:
      try:
        l = self.q.get(True,10);
        plock.acquire()
        print "popper " + str(l) + " " + self.getName()
        plock.release()
      except:
        print str(sys.exc_info()[1])

#-----------------------------------------

for i in range(5):
  c = pusher()
  c.start()

time.sleep(2)

for i in range(5):
  c = popper()
  c.start()
 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.