Before starting this post I advise that you should have already complete the RabbitMQ starting tutorial.
const sum = (a, b) => a+b;
sum(1,2); //should get 3
That’s all it takes to complete an easy task like adding up two numbers.
Seeing the example above you would say that it’s a waste of time to build an app that sends two numbers over the network and sometime in the future receive the result of that task. YES. That it absolutely true. On the other hand let’s think that we might have to send over the network a big file like for instance: a document that needs to be processed and converted to pdf, in this case our elementary app would need just some minor adjustments and will be ready to serve that task.
Start with the first step and make sure you’ve completed every step:
install Nodejs and NPM.
create a directory for your app and step into it
install ExpressJS
install amqplib
install socket.io
We will make use of ExpressJS framework for serving requests that will be made at http://whatever/api/calc/sum
then we use amqplib to establish a connection to our RabbitMQ server, create a queue where our request will be published. After a while the task gets executed and the result it’s emitted to our client using the socket.io library.
whatever/app.js
var express = require('express')
var bodyParser = require('body-parser')
var rabbitMQHandler = require('./connection')
var app = express()
var router = express.Router()
var server = require('http').Server(app)
var socketIO = require('socket.io')(server)
var calcSocket = socketIO.of('/calc')
rabbitMQHandler((connection) => {
connection.createChannel((err, channel) => {
if (err) {
throw new Error(err);
}
var mainQueue = 'calc_sum'
channel.assertQueue('', {exclusive: true}, (err, queue) => {
if (err) {
throw new Error(err)
}
channel.bindQueue(queue.queue, mainQueue, '')
channel.consume(queue.que, (msg) => {
var result = JSON.stringify({result: Object.values(JSON.parse(msg.content.toString()).task).reduce((accumulator, currentValue) => parseInt(accumulator) + parseInt(currentValue)) });
calcSocket.emit('calc', result)
})
}, {noAck: true})
})
})
app.use(bodyParser.urlencoded({ extended: true }))
app.use('/api', router)
router.route('/calc/sum').post((req, res) => {
rabbitMQHandler((connection) => {
connection.createChannel((err, channel) => {
if (err) {
throw new Error(err)
}
var ex = 'calc_sum'
var msg = JSON.stringify({task: req.body });
channel.publish(ex, '', new Buffer(msg), {persistent: false})
channel.close(() => {connection.close()})
})
})
})
server.listen(5555, '0.0.0.0',
() => {
console.log('Running at at localhost:5555')
}
)
whatever/connection/connection.js
var amqp = require('amqplib/callback_api')
module.exports = (callback) => {
amqp.connect('amqp://user:user@whateverhost:whateverport',
(error, conection) => {
if (error) {
throw new Error(error);
}
callback(conection);
})
}
whatever/connection/index.js
var rabbitMQHandler = require('./connection');
module.exports = rabbitMQHandler;
router.route('/calc/sum').post((req, res) => {
rabbitMQHandler((connection) => {
connection.createChannel((err, channel) => {
if (err) {
throw new Error(err)
}
var ex = 'calc_sum'
var msg = JSON.stringify({task: req.body });
channel.publish(ex, '', new Buffer(msg), {persistent: false})
channel.close(() => {connection.close()})
})
})
})
Using rabbitMQHandler and passing a callback function will give us a connection object which we use to creat a channel, then we will be sending the body of the request by publishing it on the queue. We publish this request further because we don’t want to keep the blocked until we process.
rabbitMQHandler((connection) => {
connection.createChannel((err, channel) => {
if (err) {
throw new Error(err);
}
var mainQueue = 'calc_sum'
channel.assertQueue('', {exclusive: true}, (err, queue) => {
if (err) {
throw new Error(err)
}
channel.bindQueue(queue.queue, mainQueue, '')
channel.consume(queue.que, (msg) => {
var result = JSON.stringify({result: Object.values(JSON.parse(msg.content.toString()).task).reduce((accumulator, currentValue) => parseInt(accumulator) + parseInt(currentValue)) });
calcSocket.emit('calc', result)
})
}, {noAck: true})
})
})
In the end, you can see how we are consuming the request that is popped out from the queue and then we send it using the socket stream.
Like we did in the 2.1 section, we need to install some packages in before continuing with the code
create a directory for your app
Install react - Install react-dom - Install socket.io-client - Install url-search-params - Install parcel
whatever/index.js
import React, { Component } from 'react';
import ReactDOM from 'react-dom';
import * as API from './api';
import URLSearchParams from 'url-search-params';
import './styles.css';
export default class Sum extends React.Component{
constructor(props){
super(props);
this.state = {
a: '',
b: '',
result: ''
};
this.handleChange = this.handleChange.bind(this);
this.handleSubmit = this.handleSubmit.bind(this);
}
componentDidMount() {
console.log(API);
API.subscribe(({result})=>{
this.setState({
result: result
})
});
}
handleChange(event) {
this.setState({
[event.target.name]: event.target.value}
);
}
handleSubmit(event) {
event.preventDefault();
const params = new URLSearchParams();
params.append('a', this.state.a);
params.append('b', this.state.b);
fetch(`${API.API_URL}/api/calc/sum`, { method: 'POST', body: params })
.then(res => res.json());
}
render() {
const result = this.state.result ? (
<label>
Result:
<input type="text" value={this.state.result} name='b' readOnly />
</label>
) : '';
return (
<form onSubmit={this.handleSubmit}>
<label>
A:
<input type="text" name='a' onChange={this.handleChange} />
</label>
<label>
B:
<input type="text" name='b' onChange={this.handleChange} />
</label>
{result}
<br/>
<input type="submit" value="Add" />
</form>
);
}
}
ReactDOM.render(
<Sum />,
document.getElementById("mainReact")
);
whatever/style.css
html,
body{
background-color: #f2f2f2;
}
#main-section{
margin: 0 auto;
display: flex;
justify-content: center;
align-items: center;
width: 80%;
height: 100%;
min-height: 24em;
border-radius: 5px;
}
input[type=text] {
padding: 12px 20px;
margin: 8px 0;
box-sizing: border-box;
border: 3px solid #ccc;
-webkit-transition: 0.5s;
transition: 0.5s;
outline: none;
border-radius: 5px;
}
input[type=text]:focus {
border: 3px solid #555;
}
input[type=submit] {
background-color: #4CAF50;
border: none;
color: white;
padding: 16px 32px;
text-decoration: none;
margin: 4px 2px;
cursor: pointer;
}
input[type=submit]:hover {
background-color: #45a049;
}
whatever/socket-api.js
import clientSocket from 'socket.io-client';
export const API_URL = "http://whatever:5555";
const socket = clientSocket(`${API_URL}/calc`);
export const subscribe = (newCallback) => {
socket.on("calc", (result) => {
result = JSON.parse(result);
newCallback(result);
});
}
whatever/index.html
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>RabbitMQ Calc App</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body class="top-margin">
<div class="container">
<main id="main-section">
<div id="mainReact">
</div>
</main>
<script src="src/index.js"></script>
</body>
</html>
handleSubmit(event) {
event.preventDefault();
const params = new URLSearchParams();
params.append('a', this.state.a);
params.append('b', this.state.b);
fetch(`${API.API_URL}/api/calc/sum`, { method: 'POST', body: params })
.then(res => res.json());
}
The function above is called when the form is submitted and then we are raising a request to our backend api for getting the result.
export const subscribe = (newCallback) => {
socket.on("calc", (result) => {
result = JSON.parse(result);
newCallback(result);
});
}
Here we are taking advantage of the javascript execution context by defining a callback that will be called at the time when our task is done.
Client -> parcel index.html
Backend -> node app.js