Menangani Antrian Task dengan Kue di Node.js

Ridwan Fajar 10 April 2017

Menangani Antrian Task dengan Kue di Node.js

Antrian atau queueing merupakan salah satu trik yang berguna sekali bila aplikasi web kita menerima request yang cukup banyak. Mengapa menggunakan antrian? karena ketika suatu eksekusi kode dilakukan bisa jadi terdapat proses yang cukup panjang untuk dieksekusi dan membuat response time terlalu lama sehingga user mendapatkan halaman balik dalam waktu cukup lama.

Belum lagi timeout pada suatu proses ini dapat mengakibatkan sejumlah kehilangan data yang tak terelakkan karena request yang datang tidak dapat dilayani oleh server. Banyak sekali solusi yang dapat digunakan untuk mengimplementasikan antrian dalam sistem aplikasi web kita. Hanya saja Kue lebih mudah dipasang di aplikasi web Node.js ketimbang menggunakan teknologi lain seperti Kafka dan RabbitMQ.

##Persiapan dan Instalasi

Sekarang kita akan melakukan instalasi terlebih dahulu untuk beberapa perangkat yang diperlukan (bagi yang sudah memasangnya silahkan lewati beberapa bagian instalasi dibawah ini):

$ apt-get install nodejs
$ apt-get install redis-server
$ apt-get install mongodb
$ mkdir node_kue
$ cd node_kue
$ npm install kue
$ npm install mongoose

Kemudian nyalakan server Redis di konsol pertama:

$ redis-server
3227:C 08 Apr 21:27:09.142 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
3227:M 08 Apr 21:27:09.145 * Increased maximum number of open files to 10032 (it was originally set to 4864).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 3.2.6 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 3227
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

3227:M 08 Apr 21:27:09.155 # Server started, Redis version 3.2.6
3227:M 08 Apr 21:27:21.421 * DB loaded from disk: 12.266 seconds
3227:M 08 Apr 21:27:21.421 * The server is now ready to accept connections on port 6379

Kemudian nyalakan mongodb di konsol kedua

$ mongod
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] MongoDB starting : pid=3952 port=27017 dbpath=/data/db 64-bit host=MacBook-Air.local
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] db version v3.4.1
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] git version: 5e103c4f5583e2566a45d740225dc250baacfbd7
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] OpenSSL version: OpenSSL 1.0.2k  26 Jan 2017
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] allocator: system
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] modules: none
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] build environment:
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten]     distarch: x86_64
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten]     target_arch: x86_64
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] options: {}
2017-04-08T22:20:47.874+0700 I -        [initandlisten] Detected data files in /data/db created by the 'wiredTiger' storage engine, so setting the active storage engine to 'wiredTiger'.
2017-04-08T22:20:47.875+0700 I STORAGE  [initandlisten] wiredtiger_open config: create,cache_size=1536M,session_max=20000,eviction=(threads_max=4),config_base=false,statistics=(fast),log=(enabled=true,archive=true,path=journal,compressor=snappy),file_manager=(close_idle_time=100000),checkpoint=(wait=60,log_size=2GB),statistics_log=(wait=0),
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten]
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] ** WARNING: Access control is not enabled for the database.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] **          Read and write access to data and configuration is unrestricted.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten]
2017-04-08T22:20:49.303+0700 I FTDC     [initandlisten] Initializing full-time diagnostic data capture with directory '/data/db/diagnostic.data'
2017-04-08T22:20:49.304+0700 I NETWORK  [thread1] waiting for connections on port 27017
2017-04-08T22:20:49.881+0700 I NETWORK  [thread1] connection accepted from 127.0.0.1:52873 #1 (1 connection now open)

Sekarang silahkan buat file dengan struktur berikut (abaikan node_modules karena direktori tersebut dibuat secara otomatis):

.
├── app.js
├── models
│   └── activity_log.js
└── worker.js

##Membuat Model

Disini kita akan membuat sebuah model untuk dipetakan kedalam collection di MongoDB. Kita akan membuat sebuah collection yang bernama activity log dimana kita dapat menyimpan berbagai activity log suatu user terhadap halaman yang dikunjunginya. Pura - puranya sih begitu.

Collection tersebut terdiri dari empat field dengan dua field tambahan created_at dan updated_at. Silahkan buat kode berikut di dalam file models/activity_log.js:

var mongoose = require('mongoose');
var Schema = mongoose.Schema;

var activityLogSchema = new Schema({
  title: { type: String, required: true},
  body: String,
  status: String,
  url: String,
},
{
    timestamps: true
});

var ActivityLog = mongoose.model('ActivityLog', activityLogSchema);

module.exports = ActivityLog;

Model ini akan digunakan di file worker.js.

##Membuat Task

Sekarang kita akan membuat kode untuk menangani task dengan mode cluster. Dimana kita akan memanfaatkan jumlah CPU yang dimiliki oleh server. Silahkan buat kode berikut di dalam file worker.js:

var kue = require('kue')
	, cluster = require('cluster')
  	, jobs = kue.createQueue();

var mongoose = require('mongoose');

mongoose.Promise = global.Promise;
mongoose.connect('mongodb://localhost/node_kue');

var ActivityLog = require('./models/activity_log.js');
var clusterWorkerSize = require('os').cpus().length;

if (cluster.isMaster) {

  	// start the UI
	kue.app.listen( 3000 );
	console.log( 'UI started on port 3000' );

	for (var i = 0; i < clusterWorkerSize; i++) {
	    cluster.fork();
	}
} else {

	// Consumer / Worker for jobs testing

	jobs.process( 'activity_log', 10, function ( job, done ) {
	  console.log( 'Starting ' + job.data.title );

	  console.log("Execute activity_log jobs...");

	 	var activity_log = new ActivityLog({
						title: job.data.title ,
						body:job.data.body ,
						status:job.data.status ,
						url:job.data.url
					});

		activity_log.save(function(err) {
			if (err)
			{
				console.log(err);
			}
			else
			{
				setTimeout( function () {
					console.log( 'Finished activity log jobs: ' + job.data.title );
					done();
			  	}, 100 );
			}
		});
	});

	jobs.process( 'testing', 4, function ( job, done ) {
	  console.log( 'Starting ' + job.data.title );

	  console.log("Execute testing jobs...");

	  setTimeout( function () {
		console.log( 'Finished ' + job.data.title );
		done();
	  }, 1000 );
	});

}

Pada kode diatas kita membuat sebuah objek Kue dan melakukan inisialisasi cluster. Kemudian jika cluster adalah master maka jalankan Kue server, sedangkan jika child maka buat instance untuk menangani task yang akan datang ke Kue. Sehingga bila ada suatu jobs dengan konkurensi 10 dan jumlah core suatu server adalah 4 maka total konkurensi akan menjadi 40.

Pada kode diatas, ada dua jobs. Yaitu jobs testing dengan konkurensi 4 yang hanya menampilkan stdout di konsol dan diberi delay 1000 ms. Lalu ada jobs yang menyimpan data ke MongoDB. dimana dia memiliki konkurensi 10 dan delay selama 100 ms. File ini nantinya akan digunakan di app.js.

Sekarang silahkan nyalakan worker.js di konsol lainnya:

$ node worker.js

##Membuat Kode Aplikasi

Sekarang kita akan membuat kode untuk menggunakan jobs yang telah didefinisikank di worker.js. Penggunaannya cukup sederhana dan tidak terlalu membuat kita berhenti menggunakannya. Cukup memanggil create() dan lewatkan nama jobs serta parameternya.

var kue = require('kue')
  , jobs = kue.createQueue();

// Producer for jobs testing

for ( var i = 0; i < 10; i++ ) {
	console.log( 'Creating Job #' + i );

	jobs.create( 'testing', {
	  title: 'jobs #' + i
	}).save();
}

for ( var i = 0; i < 100; i++ ) {
	console.log( 'Creating Activity Job #' + i );

	jobs.create( 'activity_log', {
	  	title: 'visited_item',
	  	body: 'lorem ipsum sit dolor amet- '+ i,
	  	status:'200',
		url:'http://www.example.com/item/mouse-imac-1j1h2j4h12'
	}).save();
}

Untuk menjalankan kode demonya silahkan jalankan di konsol lain:

$ node app.js

##Monitoring Task di Kue UI

Untuk monitoring, kamu dapat membukanya di http://localhost:3000. Disana kamu dapat melihat lima tab yang terdiri dari Inactive, Active, Delayed, Failed, Complete. Tanpa me-refresh halamannya, kamu dapat melihat update secara langsung karena sudah menggunakan websocket. Kamu pun dapat melihat hasilnya melalui mongo client dan silahkan hitung jumlahnya apakah sesuai dengan yang dikirimkan ke jobs atau tidak:

$ mongo
> use node_kue;
> db.activitylogs.find().length();
10000
>
Image
Image
Image

##Mencoba Mengubah Parameter Kue

Sekarang kita pun ingin mencoba mengubah parameter konkurensi activity_logs jobs menjadi 500 dan mengeksekusi 50000 task yang dikirimkan ke Kue.

Image

Sekarang kita pun ingin mencoba mengubah parameter konkurensi activity_logs jobs menjadi 1000 dan mengeksekusi 50000 task yang dikirimkan ke Kue.

Image

Pada kedua kasus, cenderung sama, namun kita dapat mengetahui bahwa semakin banyak konkurensi tentu saja Kue akan semakin bekerja lebih cepat. Namun tentunya kita harus mengetahui batas maksimalnya.

##Kesimpulan

Menggunakan queueing mungkin sangat membantu apalagi Node.js sendiri merupakan salah satu bahasa pemrograman yang bersifat asinkronus. Tentu saja hal ini menjadi dua keuntungan besar bila kita menggunakan Node.js dan Kue. Hanya saja kita perlu memperhatikan beberapa hal dalam penggunaan Kue ini.

  • Sebaiknya tidak ada kode yang berat di dalam suatu task, bila ingin ada kode yang berat lebih baik dipecah lagi jadi task lain
  • Bila ingin menggunakan cluster mode perhatikan jumlah core yang ada di server terlebih dahulu
  • Uji nilai maksimal konkurensi yang bisa ditangani oleh server jangan sampai kita mengatur konkurensi terlalu tinggi sehingga tidak dapat dieksekusi oleh Kue
  • Debug-lah sebelum task Kue di-deploy agar tidak banyak message yang akan di-retry oleh Kue.

Sayangnya saat ini Kue belum memiliki mekanisme untuk scaling bila suatu saat kapasitas dari server yang dipasang Kue mengalami lonjakan penanganan task, hal lain yang bisa terjadi adalah ketika antrian sangat banyak dan menumpuk sehingga proses eksekusi antrian terbaru dapat menunggu lebih lama. Capacity planning sangat diperlukan pada kasus ini.