# CS 100 Concurrency Demo

In [4]:
import math
import threading
from queue import Queue
from threading import Thread

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                print(e)
            finally:
                self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)
        
    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))
        
    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()


In [17]:
def foo(n):
    global shared
    for _ in range(n):
        shared = shared + 1

def bar(n):
    global shared
    for _ in range(n):
        shared = shared + 1

In [18]:
def test(n):
    global shared
    shared = 0
    pool = ThreadPool(2)
    pool.add_task(foo, n)
    pool.add_task(bar, n)
    pool.wait_completion()
    print(shared)

In [None]:
test(50)

In [None]:
test(500)

In [None]:
test(5_000)

In [None]:
test(50_000)

In [None]:
test(500_000)

In [None]:
test(5_000_000)

## Locks for "Mutual Exclusion"

In [13]:
def locked_foo(n):
    global shared, lock
    for _ in range(n):
        lock.acquire()
        shared = shared + 1
        lock.release()

def locked_bar(n):
    global shared, lock
    for _ in range(n):
        lock.acquire()
        shared = shared + 1
        lock.release()

In [14]:
def locked_test(n):
    global shared, lock
    shared = 0
    lock = threading.Lock()
    pool = ThreadPool(2)
    pool.add_task(locked_foo, n)
    pool.add_task(locked_bar, n)
    pool.wait_completion()
    print(shared)

In [15]:
locked_test(500_000)

1000000


In [19]:
%%timeit -n 1 -r 5
test(5_000_000)

6848531
6474440
6420116
6748823
6399092
930 ms ± 94.7 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


In [20]:
%%timeit -n 1 -r 5
locked_test(5_000_000)

10000000
10000000
10000000
10000000
10000000
3.36 s ± 263 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


## Locks for syncing

In [108]:
from IPython.display import clear_output
from time import sleep
from random import random

def display(n):
    global count, lockA, lockB
    for i in range(n):
        lockA.acquire()
        print(count)
        lockB.release()
        sleep(random())
        
def fA(n):
    global count, lockA, lockB
    for i in range(n):
        sleep(random())
        lockB.acquire()
        count += 1
        lockA.release()   


def test(n):
    global count, lockA, lockB
    count = 0
    lockA = threading.Lock()
    lockB = threading.Lock()
    lockA.acquire()
    t1 = Thread(target=fA, args=(n,))
    t2 = Thread(target=display, args=(n,))
    t1.start()
    t2.start()

In [109]:
test(10)

1
2
3
4
5
6
7
8
9
10


In [64]:
sleep(random())