""" 多线程的简单使用
队列的简单使用
"""
from multiprocessing import Queue
# 创建队列
que = Queue(4) # 参数为队列容量,默认无限
# 向队列中放置内容
que.put('内容1')
que.put('内容2')
que.put('内容3')
que.put('内容4')
# print('队列是否已满:',que.full())
# que.put('内容5',timeout=3) # 因为队列已满,所以内容5无法放置 设置超时 若超过设置的时间仍未放置则报出异常“queue.Full”
print('队列是否已满:', que.full())
# 可以在写入时先判断队列是否已满,若未满则放入
if not que.full():
que.put('内容5')
# 所以在读取队列时需要先判断队列是否为空,不为空则读取
if not que.empty():
for i in range(que.qsize()):
print(que.get())
使用队列实现进程之间的通信
注:线程池内进程之间通信与进程之间通信略有不同
""" 多线程的简单使用
使用队列实现进程之间的通信
"""
from multiprocessing import Process,Queue
from time import sleep
def write_queue(que):
"""将列表元素写入队列"""
for i in ['你','好','啊']:
print('开始写入:',i)
que.put(i)
sleep(1)
def read_queue(que):
"""从队列中读取元素"""
print('开始读取')
while True:
if not que.empty():
print('读取元素:',que.get())
else:
break
if __name__ == '__main__':
que = Queue() # 创建队列
write_process = Process(target=write_queue,args=(que,)) # 创建写入线程
read_process = Process(target=read_queue,args=(que,)) # 创建读取进程
# 启动线程
write_process.start()
write_process.join()
read_process.start()
read_process.join()
""" 多线程的简单使用
使用队列实现进程池中进程之间的通信
"""
from multiprocessing import Process,Pool,Manager
from time import sleep
def write_queue(que):
"""将列表元素写入队列"""
for i in ['你','好','啊']:
print('开始写入:',i)
que.put(i)
sleep(1)
def read_queue(que):
"""从队列中读取元素"""
print('开始读取')
while True:
if not que.empty():
print('读取元素:',que.get())
else:
break
if __name__ == '__main__':
que = Manager().Queue() # 创建队列
# 创建进程
pool = Pool(2)
pool.apply(write_queue,(que,))
pool.apply(read_queue,(que,))
pool.close()
pool.join()
运行结果
使用_thread模块创建线程
_thread.start_new_thread(function, args, kwargs)
""" 多线程的简单使用
使用_thread模块
"""
from _thread import start_new_thread
from time import sleep
def job1(name,sleep_time):
print('开始运行{}'.format(name))
sleep(sleep_time)
print('{}运行结束'.format(name))
def job2(name,sleep_time):
print('开始运行{}'.format(name))
sleep(sleep_time)
print('{}运行结束'.format(name))
if __name__ == '__main__':
print('主线程运行')
# 启动线程运行函数
start_new_thread(job1,('线程1',2))
start_new_thread(job2,('线程2',3))
sleep(6)
""" 多线程的简单使用
在threading模块 线程之间共享全局变量
注:在一个进程内所有线程共享全局变量,多线程之间的数据共享比多进程好,但是可能造成多个进程同时修改一个变量,造成混乱
"""
from threading import Thread
COUNT = 0 # 设置全局变量COUNT
def add_number1(num):
"""修改全局变量"""
global COUNT
print('add_number1')
COUNT += num
print(COUNT)
def add_number2(num):
"""修改全局变量"""
global COUNT
print('add_number2')
COUNT += num
print(COUNT)
if __name__ == '__main__':
p = Thread(target=add_number1, args=(3,))
p.start()
p.join()
p = Thread(target=add_number2, args=(4,))
p.start()
p.join()
print('最终结果:COUNT={}'.format(COUNT))
使用“互斥锁”防止多个进程修改同一变量导致数据混乱
""" 多线程的简单使用
在threading模块 线程之间共享全局变量
注:在一个进程内所有线程共享全局变量,多线程之间的数据共享比多进程好,但是可能造成多个进程同时修改一个变量,造成混乱
为了防止混乱,需要使用 “ 互斥锁 ”
锁有两种状态:锁定和未锁定。某个线程要更改共享数据时,要先将其锁定,其它线程不能更改,直到该线程释放资源,将其状态变为非锁定,其它线程才能锁定并修改数据
"""
from threading import Thread, Lock
COUNT = 0 # 设置全局变量COUNT
lock_count = Lock() # 创建互斥锁
def add_number1():
"""修改全局变量"""
global COUNT
print('add_number1')
for i in range(10000):
lock_count.acquire() # 上锁
COUNT += 1
lock_count.release() # 解锁
print(COUNT)
def add_number2():
"""修改全局变量"""
global COUNT
print('add_number2')
for i in range(10000):
lock_count.acquire() # 上锁
COUNT += 1
lock_count.release() # 解锁
print(COUNT)
if __name__ == '__main__':
p = Thread(target=add_number1, args=())
p.start()
p.join()
p = Thread(target=add_number2, args=())
p.start()
p.join()
print('最终结果:COUNT={}'.format(COUNT))
线程同步的使用
""" 多线程的简单使用(threading模块)
线程同步的使用
如:线程A与B配合,A执行到某一步需要B的某个结果,示意B运行,B运行后将结果给A,A继续执行
本例中首先对lock2和lock3上锁,所以在三个线程开始后,Task2和Task3由于上锁状态无法执行,首先执行Task1,在执行时先为lock1上锁,执行语句后将
lock2改为未锁定状态,运行Task2,在执行Task2时为lock2上锁,执行语句后释放lock3,然后Task3执行...
所以运行结果为:运行Task1 --> 运行Task2 --> 运行Task3 --> 运行Task1 --> 运行Task2 --> 运行Task3 ...
"""
from time import sleep
from threading import Thread,Lock
# 创建锁
lock1 = Lock()
lock2 = Lock()
lock3 = Lock()
# 为lock2和lock3上锁
lock2.acquire()
lock3.acquire()
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print('运行Task1')
sleep(1)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print('运行Task2')
sleep(1)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print('运行Task3')
sleep(1)
lock1.release()
if __name__ == '__main__':
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
生产者消费者模式
生产者生产商品,然后将其放到类似队列的数据结构中,消费者不找生产者要数据,而是直接从队列中取。
""" 多线程的简单使用(threading模块)
生产者-消费者模式
使用队列,生产者将产品放入队列,消费者直接从队列中取出产品
"""
from time import sleep
from threading import Thread
from queue import Queue
class Producer(Thread):
"""生产者"""
def run(self):
global queue
count = 0
while True:
# 判断队列的大小
if queue.qsize()<1000:
for i in range(100):
count += 1
message = '生产产品:'+str(count)
print(message)
queue.put('产品{}'.format(count))
sleep(0.5)
class Consumer(Thread):
"""消费者"""
def run(self):
global queue
while True:
if queue.qsize()>100:
for i in range(10):
message = self.name+'消费:'+queue.get()
print(message)
sleep(1)
if __name__ == '__main__':
queue = Queue() # 创建队列
p = Producer() # 创建生产者
p.start() # 运行生产者
sleep(1)
c = Consumer() # 创建消费者
c.start()