File size: 2,481 Bytes
80eea9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import asyncio
import random
import time


class Job:
    def __init__(self, id, data):
        self.id = id
        self.data = data


async def node1(worker_id: int, input_queue, output_queue):
    while True:
        job:Job = await input_queue.get()
        job.data += f' (processed by node 1, worker {worker_id})'
        await output_queue.put(job)

async def node2(worker_id: int, input_queue, output_queue):
    while True:
        job:Job = await input_queue.get()
        sleep_duration = 0.8 + 0.4 * random.random()  # Generate a random sleep duration between 0.8 and 1.2 seconds
        await asyncio.sleep(sleep_duration)
        job.data += f' (processed by node 2, worker {worker_id})'
        await output_queue.put(job)

async def node3(worker_id: int, input_queue, job_sync):
    buffer = {}
    next_i = 0
    while True:
        job:Job = await input_queue.get()
        buffer[job.id] = job  # Store the data in the buffer
        # While the next expected item is in the buffer, output it and increment the index
        while next_i in buffer:
            curr_job = buffer.pop(next_i)
            curr_job.data += f' (processed by node 3, worker {worker_id})'
            print(f'{curr_job.id} - {curr_job.data}')
            next_i += 1
            job_sync.append(curr_job)

async def main():
    input_queue = asyncio.Queue()
    buffer_queue = asyncio.Queue()
    output_queue = asyncio.Queue()

    num_jobs = 100
    joe_source = [Job(i, "") for i in range(num_jobs)]
    job_sync = []

    task1 = asyncio.create_task(node1(None, input_queue, buffer_queue))
    task3 = asyncio.create_task(node3(None, output_queue, job_sync))

    num_workers = 5
    tasks2 = []
    for i in range(num_workers):
        task2 = asyncio.create_task(node2(i + 1, buffer_queue, output_queue))
        tasks2.append(task2)

    for job in joe_source:
        await input_queue.put(job)

    try:
        # await asyncio.gather(task1, *tasks2, task3)
        while len(job_sync) < num_jobs:
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Pipeline cancelled")
    task1.cancel()
    for task in tasks2:
        task.cancel()
    task3.cancel()
    await asyncio.gather(task1, *tasks2, task3, return_exceptions=True)


start_time = time.time()

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("Pipeline interrupted by user")

end_time = time.time()
print(f"Pipeline processed in {end_time - start_time} seconds.")