from multiprocessing import Process
from time import sleep

def job(sleep_tine):
    """设定任务"""
    print('job工作开始')
    sleep(sleep_tine)   # 等待
    print('job工作结束')

if __name__ == '__main__':
    # 1. 主进程不等待子进程执行完毕
    print('主进程开始')
    j = Process(target=job,args=(2,))   # 创建Process对象,设定延时2秒
    j.start()   # 开始进程
    print('主进程结束')
    # 2. 主进程等待子进程执行完毕
    print('主进程开始')
    j = Process(target=job, args=(2,))  # 创建Process对象,设定延时2秒
    j.start()  # 开始进程
    j.join()    # 添加此语句  表示等待进程j终止
    print('主进程结束')
    # 3. 主进程等待子进程一定事件后,若子进程仍未执行完毕,则继续执行主进程
    print('主进程开始')
    j = Process(target=job, args=(2,))  # 创建Process对象,设定延时2秒
    j.start()  # 开始进程
    j.join(timeout=1)  # 添加timeout参数表示等待进程j的时间
    print('打印进程信息:\n进程pid:{},进程名称:{},进程是否存活:{}'.format(j.pid,j.name,j.is_alive()))
    print('主进程结束')

常用方法

方法 描述
is_alive() 如果p仍然运行,返回True
join([timeout]) 等待进程p终止。Timeout是可选的超时期限,进程可以被链接无数次,但如果连接自身则会出错
run() 进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是继承Process类并重新实现run()函数
start() 启动进程,这将运行代表进程的子进程,并调用该子进程中的run()函数
terminate() 强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需要特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏

进程池

方法 描述
apply(func [,args [,kwargs]]) 在一个池工作进程中执行函数(*args,**kwargs),然后返回结果。
apply_async(func [, args [,kwargs [,callback ] ] ]) 在一个池工作进程中异步地执行函数(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获得最终结果。Callback是可调用对象,接受输入参数。当func的结果变为可用时,将立即传递给callback。Callback禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果
close() 关闭进程池,防止进行进一步操作。如果还有挂起的操作,它们将在工作进程终止之前完成
join() 等待所有工作进程退出。此方法只能在close()或者terminate()方法之后调用
imap( func,iterable [ ,chunksize] ) map()函数的版本之一,返回迭代器而非结果列表
imap_unordered( func,iterable [,chunksize] ) 同imap()函数一样,只是结果的顺序根据从工作进程接收到的时间任意确定
map( func,iterable [,chunksize] ) 将可调用对象func应用给iterable中的所有项,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize指定每块中的项数。如果数量较大,可以增大chunksize的值来提升性能
map_async( func,iterable [,chunksize [,callback]] ) 同map()函数,但结果的返回是异步的。返回值是AsyncResult类的实例,稍后可用与获取结果。Callback是指接受一个参数的可调对象。如果提供callable,当结果变为可用时,将使用结果调用callable
terminate() 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
get( [ timeout] ) 返回结果,如果有必要则等待结果到达。Timeout是可选的超时。如果结果在指定时间内没有到达,将引发multiprocessing.TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发
ready() 如果调用完成,则返回True
sucessful() 如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常
wait([timeout]) 等待结果变为可用。Timeout是可选的超时
注意: apply_async(func[, args[, kwds[, callback]]]) 是非阻塞,apply(func[, args[, kwds]])是阻塞的
""" 多线程的简单使用
线程池的使用
"""
from multiprocessing import Pool
from time import sleep


def job(id):
    print('开始{}'.format(id))
    sleep(3)
    print('结束{}'.format(id))

if __name__ == '__main__':
    """进程池的使用(非阻塞)"""
    pool = Pool(processes=3)    # 创建线程池 线程数为3
    for i in range(5):
        # 执行的进程总数保持为3,只由当三个线程中有执行完毕的才添加新线程
        pool.apply_async(job,(i,))  # 传入的参数为:要执行的函数名  以及函数所需参数(元组)
    # 关闭线程池(不再接受新的线程请求)
    pool.close()
    # 注意:调用join之前需要先关闭线程池
    pool.join()     # join函数表示等待所有子进程结束
    """进程池的使用(阻塞)"""
    pool = Pool(processes=3)  # 创建线程池 线程数为3
    for i in range(5):
        # 执行的进程总数保持为3,只由当三个线程中有执行完毕的才添加新线程
        pool.apply(job, (i,))  # 传入的参数为:要执行的函数名  以及函数所需参数(元组)
    # 关闭线程池(不再接受新的线程请求)
    pool.close()
    # 注意:调用join之前需要先关闭线程池
    pool.join()  # join函数表示等待所有子进程结束
非阻塞状态执行结果
阻塞状态执行结果

子进程之间数据不共享

""" 多线程的简单使用
子进程之间数据不共享 
"""
from multiprocessing import Process

COUNT = 0  # 设置全局变量COUNT

def add_number1(num):
    """修改全局变量"""
    global COUNT
    print('add_number1')
    COUNT += num
    print(COUNT)

def add_number2(num):
    """修改全局变量"""
    global COUNT
    print('add_number2')
    COUNT += num
    print(COUNT)

if __name__ == '__main__':
    p = Process(target=add_number1, args=(3,))
    p.start()
    p.join()
    p = Process(target=add_number2, args=(4,))
    p.start()
    p.join()
    print('最终结果:COUNT={}'.format(COUNT))
运行结果

可以发现,COUNT设定值为0,在add_number1和add_number2函数所创建的线程的操作并没有相互影响,且原值没有改变

队列的简单使用

方法 描述
cancle_join_thread() 不会在进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞
close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列尚未写入数据,但将在此方法完成时马上关闭
empty() 如果调用此方法时q为空,返回True
full() 如果q已满,返回True
get([block [,timeout]) 返回q中的一个项。如果q为空,此方法将阻塞,直到队列中有项可用为止。Block用于控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项变为可用,将引发Queue.Empty异常
join_thread() 连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下此方法由不是q的原始创建者的所有进程调用。调用q.cancle_join_thread()方法可以禁止这种行为
put(item [ ,  block  [,  timeout]]) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。Block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout指定在阻塞模式中等待可用时空间的时间长短。超时后将引发Queue.Full异常。
qsize() 返回目前队列中项的正确数量。
joinableQueue([maxsize]) 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
task_done() 消费者使用此方法发出信号,表示q.get()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常
join() 生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到位队列中的每个项均调用q.task_done()方法为止
""" 多线程的简单使用
队列的简单使用
"""

from multiprocessing import Queue

# 创建队列
que = Queue(4)  # 参数为队列容量,默认无限

# 向队列中放置内容
que.put('内容1')
que.put('内容2')
que.put('内容3')
que.put('内容4')
# print('队列是否已满:',que.full())
# que.put('内容5',timeout=3)  # 因为队列已满,所以内容5无法放置    设置超时 若超过设置的时间仍未放置则报出异常“queue.Full”
print('队列是否已满:', que.full())
# 可以在写入时先判断队列是否已满,若未满则放入
if not que.full():
    que.put('内容5')
# 所以在读取队列时需要先判断队列是否为空,不为空则读取
if not que.empty():
    for i in range(que.qsize()):
        print(que.get())

使用队列实现进程之间的通信

注:线程池内进程之间通信与进程之间通信略有不同

""" 多线程的简单使用
使用队列实现进程之间的通信
"""
from multiprocessing import Process,Queue
from time import sleep

def write_queue(que):
    """将列表元素写入队列"""
    for i in ['你','好','啊']:
        print('开始写入:',i)
        que.put(i)
        sleep(1)

def read_queue(que):
    """从队列中读取元素"""
    print('开始读取')
    while True:
        if not que.empty():
            print('读取元素:',que.get())
        else:
            break

if __name__ == '__main__':
    que = Queue()   # 创建队列
    write_process = Process(target=write_queue,args=(que,)) # 创建写入线程
    read_process = Process(target=read_queue,args=(que,))   # 创建读取进程
    # 启动线程
    write_process.start()
    write_process.join()
    read_process.start()
    read_process.join()
""" 多线程的简单使用
使用队列实现进程池中进程之间的通信
"""
from multiprocessing import Process,Pool,Manager
from time import sleep

def write_queue(que):
    """将列表元素写入队列"""
    for i in ['你','好','啊']:
        print('开始写入:',i)
        que.put(i)
        sleep(1)

def read_queue(que):
    """从队列中读取元素"""
    print('开始读取')
    while True:
        if not que.empty():
            print('读取元素:',que.get())
        else:
            break

if __name__ == '__main__':
    que = Manager().Queue() # 创建队列
    # 创建进程
    pool = Pool(2)
    pool.apply(write_queue,(que,))
    pool.apply(read_queue,(que,))
    pool.close()
    pool.join()
运行结果

使用_thread模块创建线程

_thread.start_new_thread(function, args, kwargs)

""" 多线程的简单使用
使用_thread模块
"""
from _thread import start_new_thread
from time import sleep

def job1(name,sleep_time):
    print('开始运行{}'.format(name))
    sleep(sleep_time)
    print('{}运行结束'.format(name))

def job2(name,sleep_time):
    print('开始运行{}'.format(name))
    sleep(sleep_time)
    print('{}运行结束'.format(name))

if __name__ == '__main__':
    print('主线程运行')
    # 启动线程运行函数
    start_new_thread(job1,('线程1',2))
    start_new_thread(job2,('线程2',3))
    sleep(6)

使用threading模块创建线程

方法名 描述
run() 用以表示线程活动的方法
start() 启动线程活动
join([time]) 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生
isAlive() 返回线程是否活动的
getName() 返回线程名
setName() 设置线程名
""" 多线程的简单使用
使用threading模块
    threading.currentThread()   返回当前的线程变量
    threading.enumerate()       返回一个包含正在运行的线程list
    threading.activeCount()     返回正在运行的线程数量
    Thread(group=None.target=None,name=None,args=(),kwargs={})  # 创建线程
"""
from threading import Thread
from time import sleep

def job1(name,sleep_time):
    print('开始运行{}'.format(name))
    sleep(sleep_time)
    print('{}运行结束'.format(name))

def job2(name,sleep_time):
    print('开始运行{}'.format(name))
    sleep(sleep_time)
    print('{}运行结束'.format(name))

if __name__ == '__main__':
    print('主进程运行')
    # 创建线程
    t1 = Thread(target=job1,args=('线程1',2))
    t2 = Thread(target=job2,args=('线程2',1))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

线程之间共享全局变量

""" 多线程的简单使用
在threading模块 线程之间共享全局变量
注:在一个进程内所有线程共享全局变量,多线程之间的数据共享比多进程好,但是可能造成多个进程同时修改一个变量,造成混乱
"""
from threading import Thread


COUNT = 0  # 设置全局变量COUNT


def add_number1(num):
    """修改全局变量"""
    global COUNT
    print('add_number1')
    COUNT += num
    print(COUNT)


def add_number2(num):
    """修改全局变量"""
    global COUNT
    print('add_number2')
    COUNT += num
    print(COUNT)


if __name__ == '__main__':
    p = Thread(target=add_number1, args=(3,))
    p.start()
    p.join()
    p = Thread(target=add_number2, args=(4,))
    p.start()
    p.join()
    print('最终结果:COUNT={}'.format(COUNT))

使用“互斥锁”防止多个进程修改同一变量导致数据混乱

""" 多线程的简单使用
在threading模块 线程之间共享全局变量
注:在一个进程内所有线程共享全局变量,多线程之间的数据共享比多进程好,但是可能造成多个进程同时修改一个变量,造成混乱
    为了防止混乱,需要使用 “ 互斥锁 ”
锁有两种状态:锁定和未锁定。某个线程要更改共享数据时,要先将其锁定,其它线程不能更改,直到该线程释放资源,将其状态变为非锁定,其它线程才能锁定并修改数据
"""
from threading import Thread, Lock

COUNT = 0  # 设置全局变量COUNT
lock_count = Lock()  # 创建互斥锁


def add_number1():
    """修改全局变量"""
    global COUNT
    print('add_number1')
    for i in range(10000):
        lock_count.acquire()  # 上锁
        COUNT += 1
        lock_count.release()    # 解锁
    print(COUNT)


def add_number2():
    """修改全局变量"""
    global COUNT

    print('add_number2')
    for i in range(10000):
        lock_count.acquire()  # 上锁
        COUNT += 1
        lock_count.release()  # 解锁
    print(COUNT)


if __name__ == '__main__':
    p = Thread(target=add_number1, args=())
    p.start()
    p.join()
    p = Thread(target=add_number2, args=())
    p.start()
    p.join()
    print('最终结果:COUNT={}'.format(COUNT))

线程同步的使用

""" 多线程的简单使用(threading模块)
    线程同步的使用
    如:线程A与B配合,A执行到某一步需要B的某个结果,示意B运行,B运行后将结果给A,A继续执行
    本例中首先对lock2和lock3上锁,所以在三个线程开始后,Task2和Task3由于上锁状态无法执行,首先执行Task1,在执行时先为lock1上锁,执行语句后将
    lock2改为未锁定状态,运行Task2,在执行Task2时为lock2上锁,执行语句后释放lock3,然后Task3执行...
所以运行结果为:运行Task1 --> 运行Task2 --> 运行Task3 --> 运行Task1 --> 运行Task2 --> 运行Task3 ...
"""
from time import sleep
from threading import Thread,Lock

# 创建锁
lock1 = Lock()
lock2 = Lock()
lock3 = Lock()
# 为lock2和lock3上锁
lock2.acquire()
lock3.acquire()

class Task1(Thread):
    def run(self):
        while True:
            if lock1.acquire():
                print('运行Task1')
                sleep(1)
                lock2.release()

class Task2(Thread):
    def run(self):
        while True:
            if lock2.acquire():
                print('运行Task2')
                sleep(1)
                lock3.release()

class Task3(Thread):
    def run(self):
        while True:
            if lock3.acquire():
                print('运行Task3')
                sleep(1)
                lock1.release()

if __name__ == '__main__':
    t1 = Task1()
    t2 = Task2()
    t3 = Task3()
    t1.start()
    t2.start()
    t3.start()

生产者消费者模式

生产者生产商品,然后将其放到类似队列的数据结构中,消费者不找生产者要数据,而是直接从队列中取。

""" 多线程的简单使用(threading模块)
    生产者-消费者模式
    使用队列,生产者将产品放入队列,消费者直接从队列中取出产品
"""
from time import sleep
from threading import Thread
from queue import Queue

class Producer(Thread):
    """生产者"""
    def run(self):
        global queue
        count = 0
        while True:
            # 判断队列的大小
            if queue.qsize()<1000:
                for i in range(100):
                    count += 1
                    message = '生产产品:'+str(count)
                    print(message)
                    queue.put('产品{}'.format(count))
                sleep(0.5)

class Consumer(Thread):
    """消费者"""
    def run(self):
        global queue
        while True:
            if queue.qsize()>100:
                for i in range(10):
                    message = self.name+'消费:'+queue.get()
                    print(message)
                sleep(1)



if __name__ == '__main__':
    queue = Queue() # 创建队列
    p = Producer()  # 创建生产者
    p.start()   # 运行生产者
    sleep(1)
    c = Consumer()  # 创建消费者
    c.start()

ThreadLocal的使用

""" 多线程的简单使用(threading模块)
    ThreadLocal的使用
    ThreadLocal本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的。
"""
from threading import Thread,local,current_thread

local = local()

def process_student():
    student_name = local.name   # 获取当前线程关联的name
    print('线程名:{}\t学生名:{}'.format(current_thread().name,student_name))

def process_thread(name):
    local.name = name   # 绑定ThreadLocal的名称
    process_student()

if __name__ == '__main__':
    t1 = Thread(target=process_thread,args=('小七',),name='线程A')
    t2 = Thread(target=process_thread,args=('小六',),name='线程B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()

发表回复