While the list data structure is incredibly useful, both implementations we explored (array-backed and linked) have operations that run in $O(N)$ time, which make them non-ideal for use with large, growing collections of data.
By further restricting the list API, however — in particular, by isolating points of access to either the front or end of the data set — we can create data structures whose operations are all $O(1)$, and remain very useful in their own right.
Stacks are linear data structures which only permit access to one "end" of the data collection. We can only append ("push") items onto the tail end (a.k.a. the "top") of a stack, and only the most recently added item can be removed ("popped"). The last item to be pushed onto a stack is therefore the first one to be popped off, which is why we refer to stacks as last-in, first out (LIFO) structures.
# array-backed implementation
class Stack:
def __init__(self):
self.data = []
def push(self, val):
self.data.append(val)
def pop(self):
assert(self.data)
return self.data.pop(-1)
def empty(self):
return len(self.data) == 0
s = Stack()
for x in range(10):
s.push(x)
s.pop()
# linked implementation
class Stack:
class Node:
def __init__(self, val, next=None):
self.val = val
self.next = next
def __init__(self):
self.top = None
def push(self, val):
self.top = Stack.Node(val, self.top)
def pop(self):
assert self.top
val = self.top.val
self.top = self.top.next
return val
def empty(self):
return self.top == None
s = Stack()
for x in range(10):
s.push(x)
s.pop()
e.g., '(1 + 2 * (3 - (4 / 2) + 5) - (6 + 1))'
def check_parens(expr):
s = Stack()
for c in expr:
if c == '(':
s.push(c)
elif c == ')':
try:
s.pop()
except:
return False
return s.empty()
check_parens('()')
check_parens('((()))')
check_parens('()(()()(()))')
check_parens('(')
check_parens('())')
check_parens('(1 + 2 * (3 - (4 / 2) + 5) - (6 + 1))')
e.g., '(1 + 2) * 5'
$\rightarrow$ '1 2 + 5 *'
def eval_postfix(expr):
s = Stack()
toks = expr.split()
for t in toks:
if t == '+':
s.push(s.pop() + s.pop())
elif t == '*':
s.push(s.pop() * s.pop())
else:
s.push(int(t))
return s.pop()
eval_postfix('1 2 + 5 *')
# ((1 + 2) * (3 + 2)) * 10
eval_postfix('1 2 + 3 2 + * 10 *')
maze_str = """######
I #
# ## #
# ####
# O
######"""
def parse_maze(maze_str):
grid = []
for line in maze_str.split('\n'):
grid.append(['# IO'.index(c) for c in line.strip()])
return grid
def print_maze(grid):
for r in grid:
print(''.join('# IO!+'[c] for c in r))
parse_maze(maze_str)
maze = parse_maze(maze_str)
maze[1][0] = 5
maze[1][1] = 4
print_maze(maze)
class Move:
def __init__(self, frm, to):
self.frm = frm
self.to = to
def __repr__(self):
return '({},{}) -> ({},{})'.format(self.frm[0], self.frm[1],
self.to[0], self.to[1])
def moves(maze, place):
moves = [Move(place, (place[0]+d[0], place[1]+d[1]))
for d in ((-1, 0), (1, 0), (0, -1), (0, 1))
if place[0]+d[0] in range(len(maze)) and
place[1]+d[1] in range(len(maze[0])) and
maze[place[0]+d[0]][place[1]+d[1]] in (1, 2, 3)]
return moves
from time import sleep
from IPython.display import clear_output
def visit(maze, loc):
"""Annotates a loc in the maze as visited"""
maze[loc[0]][loc[1]] = 5
def mark(maze, loc):
"""Annotates a loc in the maze as being of interest"""
if maze[loc[0]][loc[1]] != 3:
maze[loc[0]][loc[1]] = 4
def display(maze):
clear_output(True)
print_maze(maze)
sleep(0.5)
def solve_maze(maze, entry):
for m in moves(maze, entry):
save_move(m)
visit(maze, entry)
while not out_of_moves():
move = next_move()
if maze[move.to[0]][move.to[1]] == 3:
return True
display(maze)
visit(maze, move.to)
for m in moves(maze, move.to):
mark(maze, m.to)
save_move(m)
return False
move_stack = Stack()
def save_move(move):
move_stack.push(move)
def next_move():
return move_stack.pop()
def out_of_moves():
return move_stack.empty()
solve_maze(parse_maze(maze_str), (1, 0))
maze_str = """#################
I # # #
# ##### # # # # #
# # # # # # #
# ### ### # # ###
# # # O
#################"""
solve_maze(parse_maze(maze_str), (1, 0))
maze_str = """#################
I #
# # # # # # # # #
# # # # # # # # #
# ###############
# O
#################"""
solve_maze(parse_maze(maze_str), (1, 0))
Queues are linear data structures wherein we are only permitted to append ("enqueue") items onto the rear, and remove ("dequeue") items from the front. The oldest item still in a queue is therefore the next one to be dequeued, which is why we refer to a queue as a first-in, first-out (FIFO) structure. It is helpful to think of a queue as being the model for a line at a typical supermarket checkout aisle (first customer in, first customer to be checked out).
# array-backed implementation
class Queue:
def __init__(self):
self.data = []
def enqueue(self, val):
self.data.append(val)
def dequeue(self): # O(N) runtime
assert self.data
return self.data.pop(0)
def empty(self):
return self.head == None
q = Queue()
for x in range(10):
q.enqueue(x)
q.dequeue()
# linked implementation
class Queue:
class Node:
def __init__(self, val, next=None):
self.val = val
self.next = next
def __init__(self):
self.head = self.tail = None
def enqueue(self, val):
if not self.tail:
self.head = self.tail = Queue.Node(val)
else:
self.tail.next = self.tail = Queue.Node(val)
def dequeue(self): # O(1) runtime
assert self.head
val = self.head.val
self.head = self.head.next
if not self.head:
self.tail = None
return val
def empty(self):
return self.head == None
q = Queue()
for x in range(10):
q.enqueue(x)
q.dequeue()
move_queue = Queue()
def save_move(move):
move_queue.enqueue(move)
def next_move():
return move_queue.dequeue()
def out_of_moves():
return move_queue.empty()
maze_str = """#################
I # # #
# ##### # # # # #
# # # # # # #
# ### ### # # ###
# # # O
#################"""
solve_maze(parse_maze(maze_str), (1, 0))
maze_str = """#################
I #
# # # # # # # # #
# # # # # # # # #
# ###############
# O
#################"""
solve_maze(parse_maze(maze_str), (1, 0))
from threading import Thread, Lock
from time import sleep
import random
lock = Lock()
def worker_fn(cid, q):
while True:
try:
with lock:
work = q.dequeue()
except: # queue is empty
sleep(1)
continue
if work == 'Stop':
print('Consumer', cid, 'stopping.')
break
else:
print('Consumer', cid, 'processing', work)
sleep(random.random())
work_queue = Queue()
for i in range(5):
Thread(target=worker_fn, args=(i, work_queue)).start()
import threading
threading.active_count()
for i in range(10):
with lock:
work_queue.enqueue(i)
for i in range(5):
with lock:
work_queue.enqueue('Stop')
from random import randint
from time import sleep
task_queue = Queue()
for i in range(3):
task_queue.enqueue(('Job {}'.format(i), randint(3, 6)))
n = task_queue.head
while n:
print(n.val)
n = n.next
while not task_queue.empty():
job, time_left = task_queue.dequeue()
print('Running', job)
sleep(1)
time_left -= 1
if time_left > 0:
print('Re-queueuing', job, 'with remaining time =', time_left)
task_queue.enqueue((job, time_left))
else:
print('*', job, 'done')
Stack & Queue implementations: