Creating a Message Queue in PHP Without External Libraries

Utilizing a message queue is a key strategy for creating distributed, high-throughput web services. A message queue allows "work" to be placed in a queue. Separate processes, sometimes referred to as workers, grab messages from the queue and process them according to the needs of the application. An example of this could be a eCommerce site that sells widgets. When a customer completes checkout, an order would be placed in the queue. At the same time, worker threads would be looking through the queue for new orders and when they find one, they would send out the email confirmation to the customer, decrement the items in stock database, and send a notice to the shipping center to ship out the widget to the customer. This separation of duties (and processes) allows many more order to be processed by adding a point of horizontal scaling to the application.

There are many message queue software packages available today across many platforms and languages, and several good ones available in PHP, such as Gearman. PHP natively provides message queue functions in its Semaphore Functions, and creating a working queue is a fairly easy process. Below is an illustrative example of a native PHP message queue that you can use as a basis for a queue in your web application or service. You'll need PHP 4.3+ on Mac or Linux (this extension is not available on Windows) and access to the crontab if you want to make sure your workers process jobs forever. PHP's native message queue works by storing messages in shared memory, and is accessible to other processes on your machine. In PHP's words, "They provide a simple and effective means of exchanging data between processes." If I was developing a large, enterprise message queue, I would not use these functions, as they don't allow workers and job servers to be managed across multiple servers. For smaller projects and tinkering around, PHP's functions are a great way to understand how message queues work and to process a small amount of requests.

Message Queue Layout

There are three main components to our queue:

  1. An add_to_queue.php script which will add items to our queue.
  2. A worker.php script which will grab items from the queue and process them.
  3. A manager.sh shell script that will make sure there are always worker.php threads running.

Add to Queue Script

<?php
// filename: add_to_queue.php

//creating a queue requires we come up with an arbitrary number
define('QUEUE', 21671);

//add message to queue
$queue = msg_get_queue(QUEUE);

// Create dummy message object
$object = new stdclass;
$object->name = 'foo';
$object->id = uniqid();

//try to add message to queue
if (msg_send($queue, 1, $object)) {
        echo "added to queue  \n";
        // you can use the msg_stat_queue() function to see queue status
        print_r(msg_stat_queue($queue));
}
else {
        echo "could not add message to queue \n";
}

First we create our queue using an arbitrary number to identify the queue. Next, we create an object to send to our worker script. This could be data for an order needing to be processed, a missile needing to be fired, or whatever data the worker script needs to do its job. Next we call the msg_send() function to add the item to our queue.

Worker Script

<?php
// filename: worker.php

//getting our requires we supply the id number we created it with
define('QUEUE', 21671);

$queue = msg_get_queue(QUEUE);

$msg_type = NULL;
$msg = NULL;
$max_msg_size = 512;

while (msg_receive($queue, 1, $msg_type, $max_msg_size, $msg)) {
        echo "Message pulled from queue - id:{$msg->id}, name:{$msg->name} \n";
        
       //do your business logic here and process this message!

       //finally, reset our msg vars for when we loop and run again
      $msg_type = NULL;
      $msg = NULL;
}

In our worker script, we get the queue we created in our add_to_queue.php script using msg_get_queue(). Before running msg_receive(), we create empty vars $msg and $msg_type that will hold the values of the message retrieved from the queue. After running msg_receive(), $msg will hold the object we sent to the queue in our worker script. See the official PHP documentation for an explanation of $msg_type, which can be safely set to 1 for the purposes of our exercise. The $max_msg_size argument of msg_receive is required and specifies the maiximum size of the returned message. Set this to something reasonable for the data you're sending through your queue. The msg_receive() function will block until it finds a message in the queue before continuing through the script. This blocking behavior can be changed by passing additional flags to the msg_receive() function.

Our Manager Shell Script

You can run the above add_to_queue.php and worker.php above in separate shell windows and watch items be added to the queue and be processed. This is great for a demo, but not robust enough if we actually wanted to use this in an application. We need a way of guaranteeing that our worker.php scripts keep running even if they encounter a fatal error or time out. To do this, we're going to create a manager.sh shell script. This shell script will keep 5 instances of worker.php running every time it runs (but no more than 5), and we will then add this to our system cron to be executed every minute.

#!/bin/sh
# filename: manager.sh

PROCESSORS=5;
x=0

while [ "$x" -lt "$PROCESSORS" ];
do
        PROCESS_COUNT=`pgrep -f process.php | wc -l`
        if [ $PROCESS_COUNT -ge $PROCESSORS ]; then
                exit 0
        fi
        x=`expr $x + 1`
        php -f /home/ebrueggeman/worker.php &
done
exit 0

Our script loops calling up to 5 worker.php scripts (located for this example in /home/ebrueggeman/). Every time a loop is run, it counts to make sure that no more than 5 workers are actively running. If there are more than five, the script exists.

To add this manager script to our crontab to fire every minute, you'll want to add a line to your cron that looks like this if we kept our manager script in my /home/ebrueggeman/ folder:

* * * * * sh /home/ebrueggeman/manager.sh

For must systems, type crontab -e as root to edit your crontab.

Completion

You should now be able to continually run the add_to_queue.php script and watch as your manager and workers process jobs. Message queue complete!