python线程池:concurrent.futures.ThreadPoolExecutor

1、什么是线程池?

线程池是一组预先创建的线程,可以用于执行并发任务。这些线程在池中等待任务,并且可以重复使用,而不需要频繁地创建和销毁线程。线程池提供了一种有效的方式来管理并发性,避免了线程创建和销毁的开销,从而提高了程序的性能。

2、ThreadPoolExecutor的基本用法

2.1 with(上下文管理器)语法

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor:
     future = executor.submit(function, arg1, arg2)
     print(result.result())
  • 导入模块
  • max_workers:指定线程池最大线程数
  • executor.submit():将任务提交到线程池执行
  • result():获取任务执行结果

2.2 一般语法

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=2)
future = executor.submit(get_html, (3))
print(future.result())

3、submit返回的future对象

3.1 什么是future对象

Future(未来对象)是一种用于异步编程的概念,它代表了一个尚未完成的操作或任务。Future对象提供了一种机制,允许您在操作尚未完成时执行其他操作,然后在操作完成时获取其结果。

Future对象通常具有以下特征和方法:

  1. 状态Future对象可以处于不同的状态,通常包括尚未完成、已完成、已取消等状态。状态信息可以帮助您确定操作的进展和结果。
  2. 结果Future对象代表了一个操作的结果,当操作完成时,可以使用result()方法获取该结果。
  3. 异常 :如果操作在执行过程中引发了异常,Future对象也可以包含异常信息,可以使用exception()方法获取。
  4. 回调 :您可以注册回调函数,以便在操作完成时自动执行特定操作。这对于处理异步任务的完成事件非常有用。
  5. 取消 :一些Future对象允许您尝试取消尚未执行的操作。
  6. 等待 :您可以等待Future对象完成,以获取其结果或等待多个Future对象中的一个或多个完成。

3.2 Future对象的属性和方法:

  1. result():用于获取任务的执行结果。如果任务尚未完成,调用此方法会阻塞,直到任务完成为止。 可以添加一个timeout参数
  2. done():用于检查任务是否已经完成。如果任务已经完成,done() 返回True;否则返回False
  3. exception():用于获取任务执行过程中引发的异常(如果有的话)。
  4. add_done_callback(fn):允许您注册一个回调函数,当任务完成时,该回调函数将被调用。
  5. cancel():尝试取消任务的执行。如果任务已经开始执行或已经完成,取消操作可能会失败。

示例

from concurrent.futures import ThreadPoolExecutor

def some_function(arg):
    return f"Result: {arg}"

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(some_function, 42)


    # 获取异常信息
    if future1.exception() is not None:
        print(f"Task 1 encountered an exception: {future1.exception()}")

    # 检查任务是否已经完成
    if future.done():
        result = future.result()
        print(f"Task completed with result: {result}")
    else:
        print("Task is still running")

    # 注册一个回调函数
    def callback(fut):
        result = fut.result()
        print(f"Callback: Task completed with result: {result}")

    future.add_done_callback(callback)

    # 取消任务(如果可能)
    if not future.done():
        future.cancel()

4、关闭线程池

4.1 shutdown

from concurrent.futures import ThreadPoolExecutor

# 创建线程池
executor = ThreadPoolExecutor()

# 关闭线程池
executor.shutdown()
  • 线程池会等待所有已提交的任务完成执行,然后才会关闭。 阻塞

  • 可以携带参数wait(bool类型),默认为True。当为False时,线程池会立即关闭,而不会等待所有任务完成。任何已经提交但尚未开始执行的任务都会被取消。

4.2 with

如果用了上下文管理器with,那么不需要显示的关闭线程池。with会在所有线程任务执行完后帮我们自动关闭,不过它是 阻塞 的。

5、wait()方法

5.1 语法

wait() 方法用于等待一组Future对象完成。它接受一个包含Future对象的可迭代对象,并可以设置超时时间。

wait(fs, timeout=**None**, return_when=ALL_COMPLETED)

  • fs: 表示需要执行的序列
  • timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
  • return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回;等待条件还可以设置为 FIRST_COMPLETED ,表示第一个任务完成就停止等待。

    from concurrent.futures import ThreadPoolExecutor, wait

    with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(some_function, arg) for arg in args]

    done, not_done = wait(futures, timeout=30)

wait() 方法将等待所有Future对象完成或者达到超时时间。它返回两集合,一个包含已完成的Future对象,另一个包含未完成的。

6、 as_completed()

as_completed() 方法用于迭代Future对象的集合,返回一个生成器,它会在Future对象完成时产生结果。

as_completed() 返回的生成器会动态生成已经完成的Future对象,允许您在它们完成时立即处理结果。( 非顺序,按照完成先后顺序

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(some_function, arg) for arg in args]

    for future in as_completed(futures):
        result = future.result()
        print(f"Task completed with result: {result}")

发表评论

评论列表,共 0 条评论

    暂无评论