Membuat Aplikasi Web Realtime dengan Django, RabbitMQ dan Vue.js Bagian 5: uWSGI WebSockets

Bagus Aji Santoso 9 Februari 2018

Membuat Aplikasi Web Realtime dengan Django, RabbitMQ dan Vue.js Bagian 5: uWSGI WebSockets

Artikel ini merupakan terjemahan seri tutorial Real time Chat application built with Vue, Django, RabbitMQ and uWSGI WebSockets bagian kelima yang ditulis oleh Osaetin Daniel.

Daftar isi:

Akhirnya kita sampai di bagian pamungkas tutorial ini. Selama menulis tutorial ini penulis mendapatkan banyak hal dan penulis harap pembaca juga sama.

Di bagian 4, kita sudah mencapai tujuan utama tutorial ini yaitu membangun aplikasi web chat dengan django dan Vue. Akan tetapi kita menemukan masalah saat akan melakukan scaling aplikasi.

WebSockets

Dari tutorial terakhir, kita membahas sebentar apa itu WebScoket, apa itu bi-directional connection yang akan terus terhubung sehingga memungkinkan server berkomunikasi dengan klien dan sebaliknya.

Django sendiri dikembangkan saat aplikasi web/website tidak serumit sekarang. Pada saat itu, server hanya akan merespon apa yang diminta. Misalnya kita bisa meminta "Halo mang server, saya mau artikel dari tanggal 1 Januari 2005" lalu servernya membalas "Siap bos, bentar yak!" lalu si server mengambil artikel yang diminta dan memberikan respon "Nih bos artikel yang diminta" lalu server tidur lagi untuk menunggu seseorang meminta rikues baru.

Apa yang bisa kita simpulkan dari contoh di atas adalah, komunikasi yang terjadi hanya diinisiasi oleh klien.

Ini masalah yang dihadapi oleh Django dan web framework Python lainnya karena sistem yang mereka pakai (WSGI) sudah terikat dengan pola komunikasi request-response seperti ini.

Oleh karena itu banyak orang mencoba menyelesaikannya dengan mencari solusi lain diantaranya:

  • Menggunakan framework/server web baru (Twisted, Tornado, dll.).
  • Menggunakan async engine dengan server websocket (gevent).
  • Menambah fitur ke server WSGI yang sudah ada (uWSGI).

Memodifikasi django secara langsung sulit dilakukan karena ia membutuhkan perubahan yang cukup drastis di sistem utamanya dimana django sudah terikat dengan sifatnya yang synchronous dengan protol WSGI.

Django channels

Andrew Godwin membawa websockets ke django melalui django-channels. Saat tutorial ini ditulis, pustaka ini menjadi project resmi dari django software foundation. Artinya ia tidak akan ditinggalkan dalam waktu dekat.

django-channels memperkenalkan protokol baru bernama ASGI yang berbeda dari WSGI. django-channels datang dengan servernya sendiri bernama Daphne. Daphne bisa menangani koneksi http biasa atau koneksi WebSocket.

Jika memutuskan untuk menggunakan django-channels kita harus mempelajari APi dan method-methodnya. Kita juga harus mengubah proses deployment.

Untuk melakukan scale seacara horizontal ke beberapa mesin kita membutuhkan apa yang disebut oleh django-channels sebagai layer Channel. Layer yang direkomendasikan adalah layer Redis. Adalah juga layer channel RabbitMQ juga sebuah IPC (Inter Procses Communication) Layer. Layer Channel ini adalah perekat antara django dengan server Daphne. Redis dan RabbitMQ biasanya dipakai untuk melakukan scale channel secara horizontal. Layer IPC channel lebih cepat tapi hanya cocok untuk satu server karena semua proses menggunakan sebuah shared memory untuk komunikasinya.

Ada beberapa kekurangan menggunakan channel Redis. Redis tidak memiliki dukungan TLS secara native dan dukungannya untuk Persistent queues tidak sebaik RabbitMQ.

Selain itu, karena ASGI Specification, django-channels mengemulasi Pub/Sub (ia tidka memakai Pub/Sub dari Redis atau RabbitmQ) yang kurang bagus jika kita perlu mendengarkan sebuah channel secara langsung.

Kita akan membangun sistem yang mirip dengan django-channels tapi dengan level yang lebih rendah. Kita akan membaca queue RabbitMQ secara langsung. uWSGI akan mengambil peran yang mirip seperti server Daphne.

Perbedaan antara django-channels dan pendekatan yang kita lakukan adalah kita tidak terbatas membuat satu server WebSocket. Kita bisa saja mengganti uWSGI dengan server WebSocket lain tanpa kesulitan yang berarti.

uWSGI WebSockets

unbit (developers uWSGI) mengambil pendekatan yang berbeda, mereka memutuskan untuk mengintegrasikan WebSockets langsung kedalam uWSGI Core. uWSGI merupakan server Web WSGI dengan performa yang cukup baik. Bisa dibilang ia merupakan Server WSGI paling populer. Ia juga mendukung beberapa bahasa pemrograman seperti Perl, Ruby bahkan Go.

Jika pembaca membutuhkan WebSockets tapi sudah menggunakan uWSGI, pembaca tidak perlu mengubah apapun. Bahkan jika menggunakan Server WSGI lain seperti gunicorn, kita hanya perlu menjalankan pip install uswgi.

Jika pembaca masih ingat diskusi kita diawal bagian 3 tentang RabbitMQ. Ingat dimana penulis mengatakan bagaimana RabbitMQ menjadi perekat antara uWSGI dengan django.

Kita perlu membuat notifikasi dan mengirimkannya ke Queue RabbitMQ dan melalui websocket pesan ini di broadcast langsung ke beberapa user.

Untuk memudahkan proses pembuatan notifikasi dan pengirimannya ke RabbitMQ, penulis membuat sebuah pustaka django bernama django-notifs.

Pasang dari PyPI.

pip install django-notifs

Lalu tambahkan ke INSTALLED_APPS:

INSTALLED_APPS = (
    'django.contrib.auth',
    ...,
    'rest_framework',
    'rest_framework.authtoken',
    'djoser',

    # Our apps
    'chat',
    'notifications'
)

Memasang django-notifs juga akan memasang pika, pustaka untuk terhubung ke RabbitMQ.

Lakukan migrasi dengan perintah python manage.py migrate

Terakhir pasang RabbitMQ. Panduannya berbeda untuk tiap sistem operasi, jadi langsung saja ke halaman pemasangan untuk mengikuti panduannya.

Sebelum melanjutkan, pastiakn bahwa server RabbitMQ sudah berjalan, jika tidak kita akan mendapat error dari pika.

Buka views.py dan ubah isi view ChatSessionMessageView:

from notifications.signals import notify


class ChatSessionMessageView(APIView):
    ...

    def post(self, request, *args, **kwargs):
        """create a new message in a chat session."""
        uri = kwargs['uri']
        message = request.data['message']

        user = request.user
        chat_session = ChatSession.objects.get(uri=uri)

        chat_session_message = ChatSessionMessage.objects.create(
            user=user, chat_session=chat_session, message=message
        )

        notif_args = {
            'source': user,
            'source_display_name': user.get_full_name(),
            'category': 'chat', 'action': 'Sent',
            'obj': chat_session_message.id,
            'short_description': 'You a new message', 'silent': True,
            'extra_data': {'uri': chat_session.uri}
        }
        notify.send(sender=self.__class__, **notif_args)

        return Response ({
            'status': 'SUCCESS', 'uri': chat_session.uri, 'message': message,
            'user': deserialize_user(user)
        })

Tepat sebelum kita mengembalikan sebuah respon ke user, kita mengirim sinyal notifikasi.

Parameter silent artinya notifikasi tidak disimpan ke database. Dengan kata lain, kita menggunakan django-notifs seperti event emitter. Kita juga bisa mengirim data di notifikasi menggunakan dictionary sebagai argumen extra_data.

Notification channels

Django-notifs menggunakan channels untuk mengirim pesan. Itu artinya kita bisa menulis custom channel untuk mengirimkan pesan lewat email, SMS, Slack, atau apapun.

Kita ingin mengirim pesan broadcast ke beberapa klien secara bersamaan. Pola komunikasi ini bernama Pub/Sub (Publish/Subscribe) dan RabbitMQ memiliki fitur ini sebagai exchanges.

Sebuah exchange adalah channel yang menerima pesan dari producer (aplikasi kita) lalu mem-broadcast-nya ke beberapa queues. Ada empat tipe exchanges yaitu direct, topic, headers dan fanout. Kita akan menggunakan fanout karena yang paling mudah dipahami dan cocok dipakai untuk kasus kita .

Berikut ilustrasi dari dokumentasi RabbitMQ tentang bagaimana fanout bekerja:

RabbitMQ Fanout

Sebelum sebuah queue menerima sebuah pesan ia harus di bound ke exchange (funout).

Untuk menerapkan pola Pub/Sub kita perlu menulis delivery channel sendiri.

Buat sebuah file dan beri nama channels.py

"""Notification channels for django-notifs."""

from json import dumps

import pika

from notifications.channels import BaseNotificationChannel


class BroadCastWebSocketChannel(BaseNotificationChannel):
    """Fanout notification for RabbitMQ."""

    def _connect(self):
        """Connect to the RabbitMQ server."""
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost')
        )
        channel = connection.channel()

        return connection, channel

    def construct_message(self):
        """Construct the message to be sent."""
        extra_data = self.notification_kwargs['extra_data']

        return dumps(extra_data['message'])

    def notify(self, message):
        """put the message of the RabbitMQ queue."""
        connection, channel = self._connect()

        uri = self.notification_kwargs['extra_data']['uri']

        channel.exchange_declare(exchange=uri, exchange_type='fanout')
        channel.basic_publish(exchange=uri, routing_key='', body=message)

        connection.close()

Kita mengatur nama exchange sesuai dengan uri sebuah sesi chat.

Kita juga menyimpan pesan chat dalam sebuah dictionary. Kita memerlukan seluruh data tentang pesan-pesan tersebut bukan hanya teks pesannya saja.

Coba buat dan kirim pesan baru, maka akan ada RabbitMQ exchange berdasarkan uri dari sesi chat yang baru.

Untuk melihat daftar exchanges, jalankan perintah ini di terminal (untuk sistem *nix):

rabbitmqctl list_exchanges
Listing exchanges
amq.match	headers
amq.direct	direct
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
	direct
amq.fanout	fanout
amq.headers	headers
fe662fd9de834fc	fanout  # our Exchange

Pembaca juga bisa melihat beberapa exchanges bawaan.

Sekarang kita akan membuat queue secara dinamis dan menautkannya ke exchange yang sudah kita buat sebelumnya sehingga mereka bisa menerima pesan tersebut.

Buat file baru bernama websocket.py.

"""Receive messages over from RabbitMQ and send them over the websocket."""

import pika


connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()

channel.exchange_declare(
    exchange='fe662fd9de834fc', exchange_type='fanout'
)

# exclusive means the queue should be deleted once the connection is closed
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue  # random queue name generated by RabbitMQ

channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)

print('listening for messages...')

while True:
    for method_frame, _, body in channel.consume(queue_name):
        try:
            print(body)
        except OSError as error:
            print(error)
        else:
            # acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

Sekali lagi, setelah terhubung ke RabbitMQ menggunakan pika kita mendeklarasikan exchange.

Mendeklarasikan sebuah exchange (atau queue) beberapa kali tidak memiliki efek tambahan, jika exchange ada sebelum RabbitMQ maka exchange tersebut tidak melakukan apapun.

Mari lihat baris berikut lebih detail:

channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)

Baris ini menautkan queue ke exchange. Yang dilakukannya seperti "Hei exchange, saya tertarik dengan pesan yang kamu terima. Kirimin ke saya dong."

queue_name dibuat secara random oleh RabbitMQ karen akita memanggil queue_declare tanpa membuat nama.

Ada beberapa cara untuk mengonsumsi pesna dari channel. Kita bisa menggunakan callback atau mengonsumsinya secara manual dengan for loop. Kita akan menggunakan opsi yang kedua sehingga dapat meng-handle exception yang muncul saat mengirim pesan ke klien. Akan lebih jelas alasan mengapa memiliih teknik yang kedua saat kita mengimplementasi WebSocket.

channel.basic_ack(method_frame.delivery_tag) memastikan bahwa klien sudah menerima pesan sehingga pesna tersebut bisa di hapus dari queue. Jika pesan belum dipastikan, ia akan tetap ada di dalam queue sampai queue tersebut dihapus.

Sekarang jalankan file websocket dengan perintah:

$ python websocket.py
listening for messages...

Kembali ke UI aplikasi chat dan kirimkan beberapa pesan. Penulis mengirim "hello world" dan "how are you doing" lalu mendapat pesan sebagai berikut:

listening for messages...
b'{"user": {"id": 1, "username": "danidee", "email": "", "first_name": "", "last_name": ""}, "message": "Hello world"}'
b'{"user": {"id": 12, "username": "daniel", "email": "", "first_name": "", "last_name": ""}, "message": "How are you doing"}'

Buka sebuah terminal baru, jalankan file websocket dan kirimkan lagi pesan dari aplikasi chat. Seharusnya pesan-pesan baru juga akan muncul.

Terakhir, kita akan mengirim pesan langsung ke user.

Dimana websocket-nya?

Websocket sudah ada di dalam objek Python uwsgi. Jadi, pasang dulu uWSGI jika belum dilakukan.

$ pip install uwsgi

Kita akan membuat beberapa modifikasi ke file websocket.py

"""Receive messages over from RabbitMQ and send them over the websocket."""

import sys

import pika
import uwsgi


def application(env, start_response):
    """Setup the Websocket Server and read messages off the queue."""
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()

    exchange = env['PATH_INFO'].replace('/', '')

    channel.exchange_declare(
        exchange=exchange, exchange_type='fanout'
    )

    # exclusive means the queue should be deleted once the connection is closed
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue  # random queue name generated by RabbitMQ

    channel.queue_bind(exchange=exchange, queue=queue_name)

    uwsgi.websocket_handshake(
        env['HTTP_SEC_WEBSOCKET_KEY'],
        env.get('HTTP_ORIGIN', '')
    )

    def keepalive():
        """Keep the websocket connection alive (called every 30 seconds)."""
        print('PING/PONG...')
        try:
            uwsgi.websocket_recv_nb()
            connection.add_timeout(30, keepalive)
        except OSError as error:
            connection.close()
            print(error)
            sys.exit(1)  # Kill process and force uWSGI to Respawn

    keepalive()

    while True:
        for method_frame, _, body in channel.consume(queue_name):
            try:
                uwsgi.websocket_send(body)
            except OSError as error:
                print(error)
                sys.exit(1)  # Force uWSGI to Respawn
            else:
                # acknowledge the message
                channel.basic_ack(method_frame.delivery_tag)

API websocket uwsgi cukup sederhana. Kita hanya akan menggunakan tiga method:

  • uwsgi.websocket_handshake: Menjadi jembatan antara HTTP dengan protokol WS. Method ini membangun koneksi antara klien dengan server, jika gagal maka akan ada exception.
  • uwsgi.websocket_recv_nb: Meski nama lengkap method-nya adalah websocket receive non blocking ia sebetulnya tidak hanya menerima pesan dalam bentuk non-blocking tapi juga membantu menjaga koneksi dengan klien dengan mengirimkan pong ke browser (sebuah mekanisme untuk memeriksa apakah klien masih aktif). Untuk menjaga koneksi, method ini akan dipanggil setiap 30 detik, jika tidak koneksi klien dapat terputus.
  • uwsgi.websocket_send: Fungsi method ini sudah cukup jelas dari namanya. Alasan kita memberikan error handler karena saat koneksi ditutup dan kita mencoba mengirim pesan dengan uwsgi.websocket_send, akan muncul OSError. Lalu kita menutup koneksi ke RabbitMQ dan mematikan process-nya.uWSGI akan melakukan proses restart untuk kita. Karena terjadi error, maka isi blok else tidak akan dipanggil sehingga pesan tetap ada di queue. Jika tidak terjadi error, blok else akan dipanggil yang artinya pesan sudah dikirim dan akan dihapus dari queue.

Saat kita masuk lagi ke loop berikutnya dan memanggil channel.consume, ia akan mengirim pesan yang masih unacknowledged ditambah pesan-pesan baru di queue. Artinya, kita tidak akan kehilangan pesan yang gagal dikirim karena masalah koneksi.

Apakah pembaca memperhatiakn bahwa uri exchange tidak lagi di hardcoded? Kita mengambil namanya dari koneksi URL yang akan diakses oleh klien seperti ini:

http://websocket-server/<uri>

Jangan risau jika belum jelas. Nanti saat kita menghubungi frontend Vue ke server WebSocket, hal-hal yang sudah kita lakukan akan mulai terlihat jelas.

Menghubungi WebSocket dengan JavaScript

Dalam konteks aplikasi web, tidak mungkin menghubungi WebSocket tanpa JavaScript. Sebagian besar browser sudah memiliki dukungan WebSocket sehingga tidak ada aplikasi tambahan yang harus kita pasang.

Mari perbarui komponen Chat:

<script>
const $ = window.jQuery

export default {
  ...

  created () {
    this.username = sessionStorage.getItem('username')

    // Setup headers for all requests
    $.ajaxSetup({
      headers: {
        'Authorization': `Token ${sessionStorage.getItem('authToken')}`
      }
    })

    if (this.$route.params.uri) {
      this.joinChatSession()
    }

    this.connectToWebSocket()
  },

  methods: {
    ...

    postMessage (event) {
      const data = {message: this.message}

      $.post(`http://localhost:8000/api/chats/${this.$route.params.uri}/messages/`, data, (data) => {
        this.message = '' // clear the message after sending
      })
      .fail((response) => {
        alert(response.responseText)
      })
    },

    joinChatSession () {
      ...
    },

    fetchChatSessionHistory () {
     ...
    },

    connectToWebSocket () {
      const websocket = new WebSocket(`ws://localhost:8081/${this.$route.params.uri}`)
      websocket.onopen = this.onOpen
      websocket.onclose = this.onClose
      websocket.onmessage = this.onMessage
      websocket.onerror = this.onError
    },

    onOpen (event) {
      console.log('Connection opened.', event.data)
    },

    onClose (event) {
      console.log('Connection closed.', event.data)

      // Try and Reconnect after five seconds
      setTimeout(this.connectToWebSocket, 5000)
    },

    onMessage (event) {
      const message = JSON.parse(event.data)
      this.messages.push(message)
    },

    onError (event) {
      alert('An error occured:', event.data)
    }
  }
}
</script>

Lalu mulai Server WebSocket uWSGI di port 8081 dan refresh browser.

$ uwsgi --http :8081 --module websocket --master --processes 4

Hore!

Seharusnya sekarang kita sudah bisa mengirim pesan dan melihat hasilnya secara realtime.

Websockets for everyone

Masih ada satu masalah. Buka tiga tab baru (5 klien aktif). Klien terakhir tidak akan bisa terhubung karena 4 proses yang sudah kita atur telah dipakai klien lain.

CATATAN: Itulah alasan kenapa kita perlu mematikan proses yang macet dengan sys.exit(1). Proses yang macet itu belum tentu karena sistem atau koneksi, bisa saja karena user sengaja meninggalkan chat room sehingga membuat uWSGI harus menunggu beberapa saat sebelum menutup koneksi di server.

Opsi –-master akan memanggil master process yang akan memonitor proses-proses yang macet. Tanpanya, proses yang macet akan terus ada dan tidak pernah direstart sampai uWSGI dimatikan.

Asynchronous IO dan Concurrency

Mudahnya ide mengapa kita membutuhkan AsyncIO atau cukup async saja adalah saat kita memiliki beberapa IO bound tasks yang perlu dijalankan. Saat operasi IO (dalam kasus kita, mengirim dan menerima pesan), daripada menunggu pesan baru, suatu proses bisa berganti IO bound task dan menjalankannya.

Konsep sederhana ini yang membuat NodeJS unggul. Untuk Python dan uWSGI agak berbeda karena memang tidak didesain sebagai asynchronous. Ada beberapa pustaka async untuk Python. Pustaka resmi asyncio, gevent, curio dll. uWSGI sendiri mendukung beberapa pustaka tadi, tapi kita akan memakai gevent.

Dari pengalaman, penulis menemukaan bahwa gevent bekerja lebih baik dibanding asyncio dan uGreen. Gevent juga memiliki banyak method yang berguna. Milsanya monkey.patch_all yang akan mengganti sebagian besar pustaka standar dengan pustakanya gevent sehingga bisa menulis kode synchronous yang akan dieksekusi secara asynchronous.

Pasang gevent dengan pip:

$ pip install gevent

Jalankan Server WebSocket uWSGI:

$ uwsgi --http :8081 --gevent 2 --module websocket --gevent-monkey-patch --master

Kita memulai server dengan 2 thread gevent dan sebuah process. Itu artinya kita dapat meng-handle dua klien (naik turun antara 3 sampai 4 klien secara random, tapi 2 klien bisa menerima pesan secara pasti), saat ada lebih dari jumlah klien yang mampu dilayani kita akan mendapatkan pesan di uWSGI:

async queue is full !!!

Ada dua cara untuk menyelesaikan permasalahan ini. Cara yang paling mudah ialah dengan menaikkan jumlah thread gevent. Jika kita mengganti kode uWSGI seperti berikut ini, maka kita dapat meng-handle 100 user bersamaan.

$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master

Lalu bagaimana jika ingin ada beberapa proses?

Kita juga bisa menaikkan jumlah proses lebih dari satu, misalnya di perintah berikut ini kita akan memulai server uWSGI dengan 4 proses.

$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master --processes 4

Dengan menaikkan jumlah proses, kita melipatgandakan jumlah klien yang dapat dilayani sebesar 4 * 100!

Bergantung pada spesifikasi server dan konfigurasinya, kita bisa menaikkan jumlah proses dan thread gevent sesuka hati. Tapi sebelumnya, pastikan sudah memonitor performa aplikasi karena semakin besar angka yang kita tulis semakin besar pula sumber daya yang akan dipakai yang artinya performa bisa menurun.

uWSGI memiliki paket python bernama uwsgitop yang bisa dipakai untuk melakukan monitoring (seperti aplikasi top kalau di Linux).

Scaling ke beberapa server

Suatu saat kita bisa memakai sumber daya server secara maksimal dan perlu melakukan scaling ke beberapa server. Karena server websocket kita tidak terikat dengan aplikasi django, maka proses ini bisa kita lakukan dengan lebih mudah karena bisa melakukan load balance beberapa server (setiap server menjalankan proses uWSGI dan thread gevent) dibelakang Nginx.

Dengan ini, kita dapat meng-handle ribuan koneksi secara bersamaan.

Kita juga bisa menerapkan teknik clustering dan loadbalancing ke RabbitMQ jika perlu melakukan scale out. Dokumentasinya ada di https://www.rabbitmq.com/ha.html

Semoga seri tutorial ini bermaanfaat dan membantu pembaca dalam belajar.