python线程池threadpool使用方法

2022年6月12日 21点热度 0人点赞

什么是线程池?为什么要用线程池?怎么用线程池?

介绍

  1. 1

    作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

    任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过          makeRequests来创建

    任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

    任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体     的     处理任务,并返回处理结果;

    任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

    任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

    END

线程池的创建(ThreadPool(args))

  1. 1

    task_pool=threadpool.ThreadPool(num_works)

    task_pool=threadpool.ThreadPool(num_works)

     def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):

      """Set up the thread pool and start num_workers worker threads.

      

      ``num_workers`` is the number of worker threads to start initially.

      

      If ``q_size > 0`` the size of the work *request queue* is limited and

      the thread pool blocks when the queue is full and it tries to put

      more work requests in it (see ``putRequest`` method), unless you also

      use a positive ``timeout`` value for ``putRequest``.

      

      If ``resq_size > 0`` the size of the *results queue* is limited and the

      worker threads will block when the queue is full and they try to put

      new results in it.

      

      .. warning:

       If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is

       the possibilty of a deadlock, when the results queue is not pulled

       regularly and too many jobs are put in the work requests queue.

       To prevent this, always set ``timeout > 0`` when calling

       ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.

      

      """

      self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中

      self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span>

      self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中

      self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程

      self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span>

      self.createWorkers(num_workers, poll_timeout)

  2. 2

    其中,初始化参数为:

    num_works:   线程池中线程个数

     

    q_size :   任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞; 

     

    esq_size:  任务结果队列的长度;

     

    pool_timeout:  工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

    其中,成员变量:

    self._requests_queue:  任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;

     

    self._results_queue:  字典,任务对应的任务执行 

     

    self.workers:  工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;

     

    self.dismisssedWorkers:  被设置线程事件,并且没有被join的工作线程

     

    self.workRequests:  字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。

    END

工作线程的启动(self.createWorks(args))

  1. 1

    其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:

     

    从self.__init__(args)可看出:

  2. 2

    class WorkerThread(threading.Thread):

     """Background thread connected to the requests/results queues.

      

     A worker thread sits in the background and picks up work requests from

     one queue and puts the results in another until it is dismissed.

      

     """

      

     def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):

      """Set up thread in daemonic mode and start it immediatedly.

      

      ``requests_queue`` and ``results_queue`` are instances of

      ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a

      new worker thread.

      

      """

      threading.Thread.__init__(self, **kwds)

      self.setDaemon(1)#

      self._requests_queue = requests_queue#任务队列

      self._results_queue = results_queue#任务结果队列

      self._poll_timeout = poll_timeout#run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);

      self._dismissed = threading.Event()#线程事件,如果set线程事件则run会执行break,直接退出工作线程;

      self.start()

      

     def run(self):

      """Repeatedly process the job queue until told to exit."""

      while True:

       if self._dismissed.isSet():#如果设置了self._dismissed则退出工作线程

      

        # we are dismissed, break out of loop

        break

       # get next work request. If we don't get a new request from the

       # queue after self._poll_timout seconds, we jump to the start of

       # the while loop again, to give the thread a chance to exit.

       try:

        request = self._requests_queue.get(True, self._poll_timeout)

       except Queue.Empty:#尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue

        continue

       else:

        if self._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程

         # we are dismissed, put back request in queue and exit loop

         self._requests_queue.put(request)

         break

        try:<span style="color:#如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中

         result = request.callable(*request.args, **request.kwds)

         self._results_queue.put((request, result))

        except:

         request.exception = True

         self._results_queue.put((request, sys.exc_info()))#如果任务处理函数出现异常,则将异常压入到队列中

      

     def dismiss(self):</span>

      """Sets a flag to tell the thread to exit when done with current job.

      """

      self._dismissed.set()

  3. 3

    工作线程的退出

    从dismissWorkers可看出,主要工作是从self.workers 工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。

     

    如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。

    END

总结

  1. 1

    到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。

    END
经验内容仅供参考,如果您需解决具体问题(尤其法律、医学等领域),建议您详细咨询相关领域专业人士。

展开阅读全部

laozhao

这个人很懒,什么都没留下

文章评论