定义线程
最简单的方法:使用target指定线程要执行的目标函数,再使用start()启动。
语法:
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
group恒为None,保留未来使用。target为要执行的函数名。name为线程名,默认为Thread-N,通常使用默认即可。但服务器端程序线程功能不同时,建议命名。
#!/usr/bin/env python3# coding=utf-8import threadingdef function(i): print ("function called by thread {0}".format(i))threads = []for i in range(5): t = threading.Thread(target=function , args=(i,)) threads.append(t) t.start() t.join()
执行结果:
$ ./threading_define.py function called by thread 0function called by thread 1function called by thread 2function called by thread 3function called by thread 4
确定当前线程
#!/usr/bin/env python3# coding=utf-8import threadingimport timedef first_function(): print (threading.currentThread().getName()+ str(' is Starting \n')) time.sleep(3) print (threading.currentThread().getName()+ str( ' is Exiting \n')) def second_function(): print (threading.currentThread().getName()+ str(' is Starting \n')) time.sleep(2) print (threading.currentThread().getName()+ str( ' is Exiting \n')) def third_function(): print (threading.currentThread().getName()+\ str(' is Starting \n')) time.sleep(1) print (threading.currentThread().getName()+ str( ' is Exiting \n')) if __name__ == "__main__": t1 = threading.Thread(name='first_function', target=first_function) t2 = threading.Thread(name='second_function', target=second_function) t3 = threading.Thread(name='third_function',target=third_function) t1.start() t2.start() t3.start()
执行结果:
$ ./threading_name.py first_function is Starting second_function is Starting third_function is Starting third_function is Exiting second_function is Exiting first_function is Exiting配合logging模块一起使用:
#!/usr/bin/env python3# coding=utf-8import loggingimport threadingimport timelogging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', ) def worker(): logging.debug('Starting') time.sleep(2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(3) logging.debug('Exiting') t = threading.Thread(name='my_service', target=my_service)w = threading.Thread(name='worker', target=worker)w2 = threading.Thread(target=worker) # use default namew.start()w2.start()t.start()
执行结果:
$ ./threading_names_log.py[DEBUG] (worker ) Starting[DEBUG] (Thread-1 ) Starting[DEBUG] (my_service) Starting[DEBUG] (worker ) Exiting[DEBUG] (Thread-1 ) Exiting[DEBUG] (my_service) Exiting
在子类中使用线程
前面我们的线程都是结构化编程的形式来创建。通过集成threading.Thread类也可以创建线程。Thread类首先完成一些基本上初始化,然后调用它的run()。run()方法会会调用传递给构造函数的目标函数。
#!/usr/bin/env python3# coding=utf-8import loggingimport threadingimport timeexitFlag = 0class myThread (threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print ("Starting " + self.name) print_time(self.name, self.counter, 5) print ("Exiting " + self.name) def print_time(threadName, delay, counter): while counter: if exitFlag: thread.exit() time.sleep(delay) print ("%s: %s" %(threadName, time.ctime(time.time()))) counter -= 1 # Create new threadsthread1 = myThread(1, "Thread-1", 1)thread2 = myThread(2, "Thread-2", 2)# Start new Threadsthread1.start()thread2.start()print ("Exiting Main Thread")
执行结果:
$ ./threading_subclass.py Starting Thread-1Starting Thread-2Exiting Main ThreadThread-1: Tue Sep 15 11:03:21 2015Thread-2: Tue Sep 15 11:03:22 2015Thread-1: Tue Sep 15 11:03:22 2015Thread-1: Tue Sep 15 11:03:23 2015Thread-2: Tue Sep 15 11:03:24 2015Thread-1: Tue Sep 15 11:03:24 2015Thread-1: Tue Sep 15 11:03:25 2015Exiting Thread-1Thread-2: Tue Sep 15 11:03:26 2015Thread-2: Tue Sep 15 11:03:28 2015Thread-2: Tue Sep 15 11:03:30 2015Exiting Thread-2
锁
python的内置数据结构比如列表和字典等是线程安全的,但是简单数据类型比如整数和浮点数则不是线程安全的,要这些简单数据类型的通过操作,就需要使用锁。
#!/usr/bin/env python3# coding=utf-8import threadingshared_resource_with_lock = 0shared_resource_with_no_lock = 0COUNT = 100000shared_resource_lock = threading.Lock()####LOCK MANAGEMENT##def increment_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock += 1 shared_resource_lock.release() def decrement_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() ####NO LOCK MANAGEMENT ## def increment_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock += 1 def decrement_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock -= 1 ####the Main programif __name__ == "__main__": t1 = threading.Thread(target = increment_with_lock) t2 = threading.Thread(target = decrement_with_lock) t3 = threading.Thread(target = increment_without_lock) t4 = threading.Thread(target = decrement_without_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print ("the value of shared variable with lock management is %s"\ %shared_resource_with_lock) print ("the value of shared variable with race condition is %s"\ %shared_resource_with_no_lock)
执行结果:
$ ./threading_lock.py the value of shared variable with lock management is 0the value of shared variable with race condition is 0
又如:
import randomimport threadingimport timelogging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) class Counter(object): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): logging.debug(time.ctime(time.time())) logging.debug('Waiting for lock') self.lock.acquire() try: pause = random.randint(1,3) logging.debug(time.ctime(time.time())) logging.debug('Acquired lock') self.value = self.value + 1 logging.debug('lock {0} seconds'.format(pause)) time.sleep(pause) finally: self.lock.release()def worker(c): for i in range(2): pause = random.randint(1,3) logging.debug(time.ctime(time.time())) logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done')counter = Counter()for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start()logging.debug('Waiting for worker threads')main_thread = threading.currentThread()for t in threading.enumerate(): if t is not main_thread: t.join()logging.debug('Counter: %d', counter.value)
执行结果:
$ python threading_lock.py (Thread-1 ) Tue Sep 15 15:49:18 2015(Thread-1 ) Sleeping 3.00(Thread-2 ) Tue Sep 15 15:49:18 2015(MainThread) Waiting for worker threads(Thread-2 ) Sleeping 2.00(Thread-2 ) Tue Sep 15 15:49:20 2015(Thread-2 ) Waiting for lock(Thread-2 ) Tue Sep 15 15:49:20 2015(Thread-2 ) Acquired lock(Thread-2 ) lock 2 seconds(Thread-1 ) Tue Sep 15 15:49:21 2015(Thread-1 ) Waiting for lock(Thread-2 ) Tue Sep 15 15:49:22 2015(Thread-1 ) Tue Sep 15 15:49:22 2015(Thread-2 ) Sleeping 2.00(Thread-1 ) Acquired lock(Thread-1 ) lock 1 seconds(Thread-1 ) Tue Sep 15 15:49:23 2015(Thread-1 ) Sleeping 2.00(Thread-2 ) Tue Sep 15 15:49:24 2015(Thread-2 ) Waiting for lock(Thread-2 ) Tue Sep 15 15:49:24 2015(Thread-2 ) Acquired lock(Thread-2 ) lock 1 seconds(Thread-1 ) Tue Sep 15 15:49:25 2015(Thread-1 ) Waiting for lock(Thread-1 ) Tue Sep 15 15:49:25 2015(Thread-1 ) Acquired lock(Thread-1 ) lock 2 seconds(Thread-2 ) Done(Thread-1 ) Done(MainThread) Counter: 4
acquire()中传入False值,可以检查是否获得了锁。比如:
import loggingimport threadingimport timelogging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) return def worker(lock): logging.debug('Starting') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires += 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries)lock = threading.Lock()holder = threading.Thread(target=lock_holder, args=(lock,), name='LockHolder')holder.setDaemon(True)holder.start()worker = threading.Thread(target=worker, args=(lock,), name='Worker')worker.start()
执行结果:
$ python threading_lock_noblock.py (LockHolder) Starting(LockHolder) Holding(Worker ) Starting(LockHolder) Not holding(Worker ) Trying to acquire(Worker ) Iteration 1: Acquired(LockHolder) Holding(Worker ) Trying to acquire(Worker ) Iteration 2: Not acquired(LockHolder) Not holding(Worker ) Trying to acquire(Worker ) Iteration 3: Acquired(LockHolder) Holding(Worker ) Trying to acquire(Worker ) Iteration 4: Not acquired(LockHolder) Not holding(Worker ) Trying to acquire(Worker ) Iteration 5: Acquired(Worker ) Done after 5 iterations
线程安全锁
threading.RLock()
返回可重入锁对象。重入锁必须由获得它的线程释放。一旦线程获得了重入锁,同一线程可不阻塞地再次获得,获取之后必须释放。通常一个线程只能获取一次锁:
import threadinglock = threading.Lock()print 'First try :', lock.acquire()print 'Second try:', lock.acquire(0)
执行结果:
$ python threading_lock_reacquire.pyFirst try : TrueSecond try: False
使用RLock可以获取多次锁:
import threadinglock = threading.RLock()print 'First try :', lock.acquire()print 'Second try:', lock.acquire(0)
执行结果:
python threading_rlock.py First try : TrueSecond try: 1
#!/usr/bin/env python3# coding=utf-8import threadingimport timeclass Box(object): lock = threading.RLock() def __init__(self): self.total_items = 0 def execute(self,n): Box.lock.acquire() self.total_items += n Box.lock.release() def add(self): Box.lock.acquire() self.execute(1) Box.lock.release() def remove(self): Box.lock.acquire() self.execute(-1) Box.lock.release() ## These two functions run n in separate## threads and call the Box's methods def adder(box,items): while items > 0: print ("adding 1 item in the box\n") box.add() time.sleep(5) items -= 1 def remover(box,items): while items > 0: print ("removing 1 item in the box") box.remove() time.sleep(5) items -= 1 ## the main program build some## threads and make sure it worksif __name__ == "__main__": items = 5 print ("putting %s items in the box " % items) box = Box() t1 = threading.Thread(target=adder,args=(box,items)) t2 = threading.Thread(target=remover,args=(box,items)) t1.start() t2.start() t1.join() t2.join() print ("%s items still remain in the box " % box.total_items)
执行结果:
$ python3 threading_rlock2.py putting 5 items in the box adding 1 item in the boxremoving 1 item in the boxadding 1 item in the boxremoving 1 item in the boxadding 1 item in the boxremoving 1 item in the boxremoving 1 item in the boxadding 1 item in the boxremoving 1 item in the boxadding 1 item in the box0 items still remain in the box