Python - Thread Pools

Một pool luồng là một cơ chế tự động quản lý nhiều luồng một cách hiệu quả, cho phép các tác vụ được thực hiện đồng thời. Python không cung cấp việc quản lý pool luồng trực tiếp thông qua mô-đun threading .

Thay vào đó, nó cung cấp việc quản lý nhóm luồng thông qua mô-đun multiprocessing.dummy và mô-đun concurrent.futures . Các mô-đun này cung cấp giao diện thuận tiện để tạo và quản lý nhóm luồng, giúp dễ dàng thực hiện các tác vụ đồng thời.

What is a Thread Pool?

Một pool luồng là một tập hợp các luồng được quản lý bởi một pool. Mỗi luồng trong pool được gọi là một worker hoặc một luồng worker. Những luồng này có thể được tái sử dụng để thực hiện nhiều nhiệm vụ khác nhau, điều này giúp giảm bớt gánh nặng của việc tạo ra và hủy bỏ các luồng liên tục.

Các pool luồng kiểm soát việc tạo ra các luồng và vòng đời của chúng, giúp chúng hiệu quả hơn trong việc xử lý một số lượng lớn các tác vụ.

Chúng ta có thể triển khai các pool luồng trong Python bằng cách sử dụng các lớp sau −

  • Python ThreadPool Class
  • Python ThreadPoolExecutor Class

Using Python ThreadPool Class

Lớp multiprocessing.pool.ThreadPool cung cấp một giao diện pool luồng trong mô-đun multiprocessing . Nó quản lý một pool các luồng làm việc mà các công việc có thể được gửi đến để thực hiện đồng thời.

Một đối tượng ThreadPool đơn giản hóa việc quản lý nhiều luồng bằng cách xử lý việc tạo ra và phân phối các tác vụ giữa các luồng làm việc. Nó chia sẻ một giao diện với lớp Pool , ban đầu được thiết kế cho các tiến trình, nhưng đã được điều chỉnh để hoạt động với các luồng.

Các phiên bản ThreadPool hoàn toàn tương thích với các phiên bản Pool và nên được quản lý như một trình quản lý ngữ cảnh hoặc bằng cách gọi close() và terminate() một cách thủ công.

Example

Ví dụ này minh họa việc thực thi song song các hàm bình phương và lập phương trên danh sách số bằng cách sử dụng thread pool của Python, trong đó mỗi hàm được áp dụng cho các số một cách đồng thời với tối đa 3 luồng, mỗi luồng có độ trễ 1 giây giữa các lần thực thi.

from multiprocessing.dummy import Pool as ThreadPool
import time

def square(number):
   sqr = number * number
   time.sleep(1)
   print("Number:{} Square:{}".format(number, sqr))

def cube(number):
   cub = number*number*number
   time.sleep(1)
   print("Number:{} Cube:{}".format(number, cub))

numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)

pool.close()

Output

Khi thực thi đoạn mã trên, bạn sẽ nhận được đầu ra sau:

Number:2 Square:4
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125

Using Python ThreadPoolExecutor Class

Lớp ThreadPoolExecutor của Python trong mô-đun concurrent.futures cung cấp một giao diện cấp cao để thực thi bất đồng bộ các hàm bằng cách sử dụng các luồng. Mô-đun concurrent.futures bao gồm lớp Future và hai lớp Executor − ThreadPoolExecutor ProcessPoolExecutor .

The Future Class

Lớp concurrent.futures.Future chịu trách nhiệm xử lý việc thực thi bất đồng bộ của bất kỳ callable nào như một hàm. Để có được một đối tượng Future , bạn nên gọi phương thức submit() trên bất kỳ đối tượng Executor . Nó không nên được tạo trực tiếp bằng cách sử dụng constructor của nó.

Các phương thức quan trọng trong lớp Future là −

  • result(timeout=None) : This method returns the value returned by the call. If the call hasn't yet completed, then this method will wait up to timeout seconds. If the call hasn't completed in timeout seconds, then a TimeoutError will be raised. If timeout is not specified, there is no limit to the wait time.
  • cancel() : This method, attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return a boolean value False . Otherwise the call will be cancelled and the method returns True.
  • cancelled() : Returns True if the call was successfully cancelled.
  • running() : Returns True if the call is currently being executed and cannot be cancelled.
  • done() : Returns True if the call was successfully cancelled or finished running.

The ThreadPoolExecutor Class

Lớp này đại diện cho một nhóm các luồng công nhân tối đa được chỉ định để thực hiện các cuộc gọi một cách không đồng bộ.

concurrent.futures.ThreadPoolExecutor(max_threads)

Example

Dưới đây là một ví dụ sử dụng lớp concurrent.futures.ThreadPoolExecutor để quản lý và thực thi các tác vụ không đồng bộ trong Python. Cụ thể, nó cho thấy cách gửi nhiều tác vụ đến một pool luồng và cách kiểm tra trạng thái thực thi của chúng.

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
   for val in numbers:
      ret = val*val
      sleep(1)
      print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
   for val in numbers:
      ret = val*val*val
      sleep(1)
      print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
   numbers = [1,2,3,4,5]
   executor = ThreadPoolExecutor(4)
   thread1 = executor.submit(square, (numbers))
   thread2 = executor.submit(cube, (numbers))
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())
   sleep(2)
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())

Nó sẽ tạo ra output

Thread 1 executed ? : False
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125