Breaking the Mold: Concurrency Without Threads or Asyncio

Let’s explore the roots of asyncio.

Rahul Beniwal
Level Up Coding

--

Hey Everyone,

Today, while delving into Python documentation and catching up on some PyCon talks, I stumbled upon intriguing insights into event loops and asyncio. This prompted me to share my findings in this blog post.

Today, we’ll explore an alternative approach to achieving concurrency using generators. If you’re unfamiliar with generators, feel free to brush up on them through online resources before diving into this article. In essence, generators are functions whose execution can be paused and resumed at any point, unlike traditional functions that clear the call stack upon return.

Image Credit Unsplash

Step 1 -> Creating A Fib Server.

Let’s create a fib server that will generate a fib number and return to client.

from socket import *

def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)


def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)

while True:
client, addr = sock.accept()
print("Connection", addr)
fib_handler(client)


def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode("ascii") + b"\n"
client.send(resp)
print("Connection Closed")


fib_server(("", 25000))

First run the server.

python3 server.py

Second use nc to talk.

nc localhost 25000

Let’s talk

(venv) ~/python/pycons/concurrency 06:30:03 $ nc localhost 25000
10
55
10
55
12
144
20
6765

You can close this using ^C.

Step 2 -> Testing Fib Server With Multiple Clients

Let’s open two terminal sessions and try to interact with the server. I’ve started two sessions, but only one is responding, so there’s no concurrency. However, as soon as you close one session, you should be able to work with the other session.

Step 3 -> Concurrency with Generators.

from socket import *
from collections import deque
from select import select

tasks = deque()
recv_wait = {}
send_wait = {}


def run():
while any([tasks, recv_wait, send_wait]):
# If there is any task to run, run it
while not tasks:
# If not tasks, check for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
why, what = next(task)
if why == "recv":
# Must perform a blocking operation
recv_wait[what] = task
elif why == "send":
# Must perform a blocking operation
send_wait[what] = task
else:
raise RuntimeError("ARG!")
except StopIteration:
pass

def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)


def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)

while True:
yield "recv", sock
client, addr = sock.accept()
print("Connection", addr)
tasks.append(fib_handler(client))


def fib_handler(client):
while True:
yield "recv", client
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode("ascii") + b"\n"
yield "send", client
client.send(resp)
print("Connection Closed")


tasks.append(fib_server(("", 25000)))
run()

Let me give a demo first, and then I will explain the code.

As you can see above I can interact with multiple terminals now.

Code Breakdown

  • tasks: tasks keep track of tasks to run.
  • recv_wait: recv_wait keep track of tasks in which we are receiving data from the client.
  • send_wait: send_wait keep track of tasks in which we need to send data to the client.
  • why, what: why tells why we are yielding either in case of sending a response or receiving info from the client. what signifies the client for whom actions need to be performed.
  • select: select is the wrapper over select system call implementation which polls and returns a list of available sockets to act.

A summary of the above code can be

  • Identify the blocking tasks.
  • Offload the blocking tasks.
  • Poll the tasks and process them when ready.

Bonus Step -> Concurrency with Threads

Let’s try to implement the same behavior with Threads.

from socket import *
from threading import Thread


def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)


def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)

while True:
client, addr = sock.accept()
print("Connection", addr)
Thread(target=fib_handler, args=(client,)).start()


def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode("ascii") + b"\n"
client.send(resp)
print("Connection Closed")


fib_server(("", 25000))

Yeah, this code is simple and clean and gives the same Output.

Conclusion

In conclusion, this article aimed to showcase the power of generators in enabling concurrency without the overhead of Threads. While async-await based concurrency relies on async generators or coroutines, understanding the concepts demonstrated in the above code provides a foundational understanding of how more advanced features like event loops are implemented.

The code presented here was inspired by a renowned talk by David Beazley. For those interested in delving deeper, I highly recommend exploring the complete talk.

Hope you like this article. Follow Rahul Beniwal for more articles.

Other similar articles by me

Advance Django Tips

7 stories

Python Beyond Basics

10 stories

--

--