Loading Now

Python Multiprocessing Example – Parallelize Your Code

Python Multiprocessing Example – Parallelize Your Code

Utilising Python’s multiprocessing capabilities enables the execution of multiple processes concurrently, optimally harnessing the power of your CPU cores for significantly enhanced performance in tasks that demand heavy computation. In contrast to threading, which is hindered by the Global Interpreter Lock (GIL), multiprocessing constructs distinct Python interpreter processes that can operate concurrently. This article aims to guide you through the implementation of multiprocessing in your Python applications, explore various methods, address common challenges, and enhance performance for practical scenarios.

Understanding How Python Multiprocessing Functions

The multiprocessing module is designed to instantiate individual Python processes, each functioning in its own memory context and featuring its own interpreter. This structure circumvents the GIL limitation, allowing for true parallel execution. When you start processes, the operating system allocates them across the available CPU cores, thereby facilitating authentic parallelism.

The primary elements include:

  • Process – A standalone worker process dedicated to executing your functions.
  • Pool – Oversees a collection of worker processes to manage task distribution.
  • Queue – Ensures safe communication between processes in a thread-aware manner.
  • Pipe – A two-way communication path between processes.
  • Lock – A synchronization tool to avert race conditions.

A Comprehensive Guide to Implementation

Let’s commence with a straightforward example highlighting the difference between sequential and parallel processing:

import multiprocessing
import time
import math

def cpu_intensive_task(n): """Simulate CPU-intensive operations""" result = 0 for i in range(n): result += math.sqrt(i) return result

def sequential_processing(): start_time = time.time() numbers = [1000000] * 4 results = []

for num in numbers:
    results.append(cpu_intensive_task(num))

end_time = time.time()
print(f"Time taken for sequential processing: {end_time - start_time:.2f} seconds")
return results

def parallel_processing():
start_time = time.time()
numbers = [1000000] * 4

with multiprocessing.Pool() as pool:
    results = pool.map(cpu_intensive_task, numbers)

end_time = time.time()
print(f"Time taken for parallel processing: {end_time - start_time:.2f} seconds")
return results

if name == "main":
sequential_processing()
parallel_processing()

Next, we offer a more sophisticated example utilising the Process class for enhanced control:

import multiprocessing
import os
import time

def worker_function(name, shared_queue, lock): """Worker function that processes items from a shared queue.""" process_id = os.getpid()

while True:
    try:
        # Retrieve item from queue with timeout
        item = shared_queue.get(timeout=1)

        if item is None:  # Poison pill to terminate the worker
            break

        # Simulate processing
        time.sleep(0.1)

        # Safely print thread-safe outputs
        with lock:
            print(f"Process {name} (PID: {process_id}) processed: {item}")

    except:
        break

def main():

Initialise shared resources

task_queue = multiprocessing.Queue()
print_lock = multiprocessing.Lock()

# Enqueue tasks
for i in range(20):
    task_queue.put(f"Task-{i}")

# Instantiate and launch worker processes
processes = []
num_workers = multiprocessing.cpu_count()

for i in range(num_workers):
    p = multiprocessing.Process(
        target=worker_function,
        args=(f"Worker-{i}", task_queue, print_lock)
    )
    p.start()
    processes.append(p)

# Wait for all tasks to be completed
task_queue.join()

# Add poison pills to terminate workers
for _ in range(num_workers):
    task_queue.put(None)

# Await completion of all processes
for p in processes:
    p.join()

print("All tasks have been finished!")

if name == "main":
main()

Practical Applications and Scenarios

Here are real-life scenarios in which multiprocessing proves highly advantageous:

Web Scraping with Multiprocessing

import multiprocessing
import requests
import time
from urllib.parse import urljoin

def scrape_url(url): """Scrape a single URL and gather basic information.""" try: response = requests.get(url, timeout=5) return { 'url': url, 'status_code': response.status_code, 'content_length': len(response.content), 'response_time': response.elapsed.total_seconds() } except Exception as e: return {'url': url, 'error': str(e)}

def parallel_scraping(urls, num_workers=4): """Scrape multiple URLs concurrently.""" start_time = time.time()

with multiprocessing.Pool(processes=num_workers) as pool:
    results = pool.map(scrape_url, urls)

end_time = time.time()
print(f"Scraped {len(urls)} URLs in {end_time - start_time:.2f} seconds")
return results

Example usage

if name == "main":
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/json',
] * 5 # Total of 20 URLs

results = parallel_scraping(urls)

# Process the results
successful = [r for r in results if 'error' not in r]
errors = [r for r in results if 'error' in r]

print(f"Successful requests: {len(successful)}, Errors: {len(errors)}")

Data Processing Pipeline

import multiprocessing
import json
import time
from pathlib import Path

def process_json_file(file_path): """Process an individual JSON file.""" try: with open(file_path, 'r') as f: data = json.load(f)

    # Simulate data processing (e.g., data transformation)
    processed_data = {
        'file': file_path.name,
        'record_count': len(data) if isinstance(data, list) else 1,
        'size_kb': file_path.stat().st_size / 1024,
        'processed_at': time.time()
    }

    return processed_data

except Exception as e:
    return {'file': file_path.name, 'error': str(e)}

def batch_process_files(directory_path, max_workers=None):
"""Process all JSON files in a specific directory using multiprocessing."""
json_files = list(Path(directory_path).glob('*.json'))

if not json_files:
    print("No JSON files found.")
    return []

if max_workers is None:
    max_workers = min(len(json_files), multiprocessing.cpu_count())

print(f"Processing {len(json_files)} files with {max_workers} workers.")

start_time = time.time()

with multiprocessing.Pool(processes=max_workers) as pool:
    results = pool.map(process_json_file, json_files)

end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds.")

return results

Comparative Performance Analysis

The following benchmark evaluates various methodologies:

Approach CPU Cores Utilised Memory Consumption Ideal For Drawbacks
Sequential 1 Low Tasks that are I/O bound, simple scripts Does not leverage multiple cores
Threading 1 (due to GIL) Medium Tasks that are I/O bound, concurrent operations The GIL restricts true parallelism
Multiprocessing All available cores

High CPU-intensive tasks Increased memory utilization and IPC overhead
AsyncIO 1 Low High-concurrency I/O tasks Single-threaded execution, requires async/await syntax

Results from performance tests for CPU-intensive tasks on a four-core system:

import multiprocessing
import time
import threading

def benchmark_methods(): def cpu_task(n): return sum(i * i for i in range(n))

numbers = [100000] * 8

# Sequential processing
start = time.time()
seq_results = [cpu_task(n) for n in numbers]
seq_time = time.time() - start

# Multiprocessing
start = time.time()
with multiprocessing.Pool() as pool:
    mp_results = pool.map(cpu_task, numbers)
mp_time = time.time() - start

# Threading (for comparative purposes)
start = time.time()
threads = []
thread_results = []

def thread_worker(n):
    thread_results.append(cpu_task(n))

for n in numbers:
    t = threading.Thread(target=thread_worker, args=(n,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

thread_time = time.time() - start

print(f"Sequential: {seq_time:.2f}s")
print(f"Threading: {thread_time:.2f}s")
print(f"Multiprocessing: {mp_time:.2f}s")
print(f"Speedup: {seq_time / mp_time:.1f}x")

if name == "main":
benchmark_methods()

Best Practices and Common Challenges

Recommended Approaches

  • Utilise Context Managers – Always engage with multiprocessing.Pool() to guarantee proper resource cleanup.
  • Guard Entry Point – Employ if __name__ == "__main__": to prevent recursive process creation.
  • Optimal Worker Count – Start with multiprocessing.cpu_count() but modify it based on specific workloads.
  • Minimise Data Transfer – Avoid passing large objects between processes to limit serialization overhead.
  • Select Suitable Data Structures – Prefer multiprocessing.Queue for inter-process communication over standard queues.

Frequently Encountered Issues and Their Solutions

# Problem: Issues with shared state
# INCORRECT - This will not function as anticipated
counter = 0

def increment_counter(): global counter counter += 1 # Each process holds its individual copy

CORRECT - Utilise shared memory

def safe_counter_example(): def increment_shared(shared_counter, lock): with lock: shared_counter.value += 1

shared_counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()

processes = []
for _ in range(4):
    p = multiprocessing.Process(target=increment_shared, 
                               args=(shared_counter, lock))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

print(f"Final counter value: {shared_counter.value}")

Problem: Pickling errors with intricate objects

INCORRECT - Lambda functions cannot be pickled

pool.map(lambda x: x*2, numbers) # This will fail

CORRECT - Use conventional functions or functools.partial

from functools import partial

def multiply(x, factor):
return x * factor

def pickle_safe_example():
numbers = [1, 2, 3, 4, 5]
multiply_by_3 = partial(multiply, factor=3)

with multiprocessing.Pool() as pool:
    results = pool.map(multiply_by_3, numbers)

return results

Memory Management and Process Oversight

import multiprocessing
import psutil
import os

def monitor_process_resources(): """Track memory and CPU usage of multiprocessing tasks.""" def memory_intensive_task(size):

Create a substantial list for memory usage demonstration

    data = list(range(size))
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    return f"PID {os.getpid()}: Processed {size} items, Memory: {memory_mb:.1f}MB"

sizes = [1000000, 2000000, 1500000, 3000000]

print("Commencing multiprocessing with resource monitoring...")

with multiprocessing.Pool(processes=2) as pool:
    results = pool.map(memory_intensive_task, sizes)

for result in results:
    print(result)

Process cleanup and comprehensive error handling

def robust_multiprocessing_example():
def worker_with_error_handling(item):
try:

Simulate potentially failing work

        if item % 7 == 0:
            raise ValueError(f"Item {item} caused an error")
        return item * 2
    except Exception as e:
        return f"Error processing {item}: {str(e)}"

items = list(range(20))

try:
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker_with_error_handling, items)

    # Differentiate successful results from errors
    successful = [r for r in results if not isinstance(r, str) or not r.startswith("Error")]
    errors = [r for r in results if isinstance(r, str) and r.startswith("Error")]

    print(f"Successful: {len(successful)}, Errors: {len(errors)}")

except KeyboardInterrupt:
    print("Process was interrupted by the user.")
except Exception as e:
    print(f"Unexpected error: {e}")

if name == "main":
monitor_process_resources()
robust_multiprocessing_example()

Advanced Strategies and Enhancements

Custom Process Pool with Progress Tracking

import multiprocessing
from multiprocessing import Pool
import time
from tqdm import tqdm

def trackable_worker(args): """Worker function that can be tracked.""" item, delay = args time.sleep(delay) # Simulate processing delay return f"Processed {item}"

def parallel_with_progress(items, num_workers=4): """Run multiprocessing with a progress bar."""

Prepare arguments (item, processing time)

work_items = [(item, 0.1) for item in items]

with Pool(processes=num_workers) as pool:
    # Use imap for lazy evaluation and progress tracking
    results = []
    with tqdm(total=len(work_items), desc="Processing") as pbar:
        for result in pool.imap(trackable_worker, work_items):
            results.append(result)
            pbar.update(1)

return results

Dynamic worker adjustment based on system load

def adaptive_multiprocessing(tasks, max_workers=None):
"""Adjust worker count based on system resources."""
if max_workers is None:

Start with CPU count but adjust based on memory availability

    available_memory_gb = psutil.virtual_memory().available / (1024 ** 3)
    max_workers = min(
        multiprocessing.cpu_count(),
        max(1, int(available_memory_gb / 0.5))  # Allot 0.5GB per worker
    )

print(f"Using {max_workers} workers based on the available resources.")

with multiprocessing.Pool(processes=max_workers) as pool:
    results = pool.map(cpu_intensive_task, tasks)

return results

Integrating with Other Technologies

Multiprocessing complements several widely-used Python libraries:

# Integration with pandas for data processing
import pandas as pd
import numpy as np
from multiprocessing import Pool

def process_dataframe_chunk(chunk): """Process a segment of DataFrame."""

Example: Calculate rolling statistics

chunk['rolling_mean'] = chunk['value'].rolling(window=5).mean()
chunk['rolling_std'] = chunk['value'].rolling(window=5).std()
return chunk

def parallel_dataframe_processing(df, num_chunks=4):
"""Process a large DataFrame in parallel segments."""

Split DataFrame into segments

chunks = np.array_split(df, num_chunks)

with Pool(processes=num_chunks) as pool:
    processed_chunks = pool.map(process_dataframe_chunk, chunks)

# Combine outcomes
result_df = pd.concat(processed_chunks, ignore_index=True)
return result_df

Example usage

if name == "main":

Create a sample DataFrame

data = pd.DataFrame({
    'value': np.random.randn(10000),
    'category': np.random.choice(['A', 'B', 'C'], 10000)
})

result = parallel_dataframe_processing(data)
print(f"Processed DataFrame shape: {result.shape}")

For comprehensive information on Python’s multiprocessing capabilities, please refer to the official Python multiprocessing documentation. The concurrent.futures module offers a higher-level interface that is often more user-friendly for basic parallel processing tasks.

Multiprocessing is particularly effective in handling CPU-bound tasks such as mathematical computations, data processing, image manipulation, and scientific analysis. For I/O-bound operations, alternatives like asyncio or threading might be preferable. Understanding your workload’s characteristics is crucial to selecting the right tool for optimal efficacy.



This article features insights and materials sourced from various online references. We acknowledge and appreciate the contributions of all original authors, publishers, and websites. Every effort has been made to credit the original sources appropriately; any inadvertent oversight does not constitute a copyright breach. All trademarks, logos, and images mentioned are owned by their respective proprietors. If you believe that any content used in this article infringes on your copyright, please contact us immediately for review and prompt action.

This article serves informational and educational purposes and does not infringe copyright. If any copyrighted material is used without proper attribution or in violation of copyright law, it is unintentional, and we will rectify it promptly upon notification. Note that republication, redistribution, or reproduction of part or all of the content in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.