📓Real-Time Money Streaming Protocol (RTMS)

The protocol will allow for continuous streaming of payments until the merchant decides to stop the stream. The operator will track the streaming status and amounts, and the merchant can end the stream at any time.

Key Components

  1. Operator: Manages the streaming money protocol and handles payment intents.

  2. StreamIntent: Represents an intent to start a streaming payment between two parties.

  3. PaymentMethod: Defines various payment methods such as UPI, IMPS, etc.

  4. WebSocket: Provides real-time updates to both the merchant and the customer.

  5. Security: Ensures all transactions and communications are secure.

API Specifications

1. Create StreamIntent

Endpoint: POST /stream-intent

Description: Creates a new stream intent for a payment.

Parameters:

  • amount_per_interval (required): The amount to be transferred per interval (e.g., per minute).

  • currency (required): The currency code (e.g., INR).

  • payment_method (required): The payment method (UPI, IMPS, WALLET...).

  • merchant_id (required): The identifier for the merchant.

  • customer_id (required): The identifier for the customer.

  • callback_url (optional): The URL to notify the merchant of the payment status.

Response:

  • stream_intent_id: Unique identifier for the stream intent.

  • payment_details: Details or URL for initiating the payment stream.

2. Get StreamIntent Status

Endpoint: GET /stream-intent/:id

Description: Retrieves the status of a stream intent.

Parameters:

  • id (required): Unique identifier for the stream intent.

Response:

  • stream_intent_id: Unique identifier for the stream intent.

  • status: Status of the stream intent (e.g., ACTIVE, PAUSED, COMPLETED, FAILED).

  • amount: The total amount involved in the stream.

  • payment_method: The payment method used.

  • transaction_id: The transaction ID from the payment network.

3. Update StreamIntent (Pause/Resume/Stop)

Endpoint: PATCH /stream-intent/:id

Description: Updates the status of a stream intent.

Parameters:

  • id (required): Unique identifier for the stream intent.

  • action (required): The action to perform (PAUSE, RESUME, STOP).

Response:

  • stream_intent_id: Unique identifier for the stream intent.

  • status: Updated status of the stream intent (e.g., PAUSED, RESUMED, STOPPED).

4. WebSocket for Real-Time Updates

WebSocket Endpoint: /ws

Description: Provides real-time updates on the status of the stream intents.

Implementation in Node.js

Below is the Node.js implementation of the streaming money protocol with WebSocket support:

Dependencies

Install the required dependencies:

npm install express axios body-parser crypto mongoose ws node-cron

Server Setup

Create a server with the specified endpoints and WebSocket support:

const express = require('express');
const axios = require('axios');
const bodyParser = require('body-parser');
const crypto = require('crypto');
const mongoose = require('mongoose');
const WebSocket = require('ws');
const cron = require('node-cron');

const app = express();
app.use(bodyParser.json());

// Connect to MongoDB
mongoose.connect('mongodb://localhost:27017/streaming', { useNewUrlParser: true, useUnifiedTopology: true });

// Define schemas
const MerchantSchema = new mongoose.Schema({
    id: String,
    webhook_url: String,
});

const StreamIntentSchema = new mongoose.Schema({
    stream_intent_id: String,
    amount_per_interval: Number,
    currency: String,
    payment_method: String,
    merchant_id: String,
    customer_id: String,
    status: String,
    start_time: Date,
    last_transfer_time: Date,
    total_transferred: Number,
    callback_url: String,
});

const Merchant = mongoose.model('Merchant', MerchantSchema);
const StreamIntent = mongoose.model('StreamIntent', StreamIntentSchema);

// Endpoint to create a stream intent
app.post('/stream-intent', async (req, res) => {
    const { amount_per_interval, currency, payment_method, merchant_id, customer_id, callback_url } = req.body;
    const stream_intent_id = crypto.randomBytes(16).toString('hex');
    const start_time = new Date();

    const streamIntent = new StreamIntent({
        stream_intent_id,
        amount_per_interval,
        currency,
        payment_method,
        merchant_id,
        customer_id,
        status: 'ACTIVE',
        start_time,
        last_transfer_time: start_time,
        total_transferred: 0,
        callback_url: callback_url || '',
    });
    await streamIntent.save();

    res.json({ stream_intent_id, amount_per_interval, start_time });
});

// Endpoint to get stream intent status
app.get('/stream-intent/:id', async (req, res) => {
    const { id } = req.params;
    const streamIntent = await StreamIntent.findOne({ stream_intent_id: id });
    if (!streamIntent) {
        return res.status(404).send('Stream Intent not found');
    }
    res.json(streamIntent);
});

// Endpoint to update stream intent (Pause/Resume/Stop)
app.patch('/stream-intent/:id', async (req, res) => {
    const { id } = req.params;
    const { action } = req.body;
    const streamIntent = await StreamIntent.findOne({ stream_intent_id: id });
    if (!streamIntent) {
        return res.status(404).send('Stream Intent not found');
    }

    if (action === 'PAUSE') {
        streamIntent.status = 'PAUSED';
    } else if (action === 'RESUME') {
        streamIntent.status = 'ACTIVE';
    } else if (action === 'STOP') {
        streamIntent.status = 'STOPPED';
    } else {
        return res.status(400).send('Invalid action');
    }

    streamIntent.last_update_time = new Date();
    await streamIntent.save();
    res.json(streamIntent);
});

// Endpoint to handle payment status webhooks
app.post('/webhook', async (req, res) => {
    const { transaction_id, stream_intent_id, status, amount } = req.body;
    const streamIntent = await StreamIntent.findOne({ stream_intent_id });
    if (streamIntent) {
        streamIntent.status = status;
        streamIntent.transaction_id = transaction_id;
        streamIntent.amount_streamed = amount;
        streamIntent.last_update_time = new Date();
        await streamIntent.save();
        if (streamIntent.callback_url) {
            notifyMerchant(streamIntent);
        }
    }
    res.status(200).send('Event received');
});

// Notify merchant about transaction status
async function notifyMerchant(streamIntent) {
    const { merchant_id, status, amount, transaction_id, callback_url } = streamIntent;
    const merchant = await Merchant.findOne({ id: merchant_id });
    if (merchant) {
        axios.post(callback_url, {
            transaction_id,
            stream_intent_id: streamIntent.stream_intent_id,
            status,
            amount,
        }).then(response => {
            console.log('Merchant notified successfully');
        }).catch(error => {
            console.log('Failed to notify merchant', error);
        });
    }
}

// WebSocket setup for real-time updates
const wss = new WebSocket.Server({ server: app.listen(3000, () => console.log('Server running on port 3000')) });

wss.on('connection', (ws) => {
    console.log('New WebSocket connection');

    ws.on('message', (message) => {
        console.log(`Received message: ${message}`);
    });

    ws.on('close', () => {
        console.log('WebSocket connection closed');
    });
});

// Scheduled task for streaming payments
cron.schedule('* * * * *', async () => {
    const activeStreams = await StreamIntent.find({ status: 'ACTIVE' });

    for (const stream of activeStreams) {
        const now = new Date();
        if (now >= stream.last_transfer_time.getTime() + 60000) {
            // Perform transfer
            const success = await performTransfer(stream);

            if (success) {
                stream.total_transferred += stream.amount_per_interval;
                stream.last_transfer_time = now;
                await stream.save();

                // Notify WebSocket clients
                wss.clients.forEach(client => {
                    if (client.readyState === WebSocket.OPEN) {
                        client.send(JSON.stringify({
                            stream_intent_id: stream.stream_intent_id,
                            status: stream.status,
                            total_transferred: stream.total_transferred,
                        }));
                    }
                });
            } else {
                console.error(`Failed to transfer for stream: ${stream.stream_intent_id}`);
            }
        }
    }
});

async function performTransfer(stream) {
    // Logic to perform the actual transfer
    // This could be an API call to a payment gateway, a blockchain transaction, etc.
    // For example:
    const { merchant_id, customer_id, amount_per_interval, payment_method } = stream;

    // Dummy implementation
    console.log(`Transferring ${amount_per_interval} ${stream.currency} from ${customer_id} to ${merchant_id}`);
    return true;  //

Last updated