đReal-Time Money Streaming Protocol (RTMS)
The protocol will allow for continuous streaming of payments until the merchant decides to stop the stream. The operator will track the streaming status and amounts, and the merchant can end the stream at any time.
Key Components
Operator: Manages the streaming money protocol and handles payment intents.
StreamIntent: Represents an intent to start a streaming payment between two parties.
PaymentMethod: Defines various payment methods such as UPI, IMPS, etc.
WebSocket: Provides real-time updates to both the merchant and the customer.
Security: Ensures all transactions and communications are secure.
API Specifications
1. Create StreamIntent
Endpoint: POST /stream-intent
Description: Creates a new stream intent for a payment.
Parameters:
amount_per_interval
(required): The amount to be transferred per interval (e.g., per minute).currency
(required): The currency code (e.g., INR).payment_method
(required): The payment method (UPI, IMPS, WALLET...).merchant_id
(required): The identifier for the merchant.customer_id
(required): The identifier for the customer.callback_url
(optional): The URL to notify the merchant of the payment status.
Response:
stream_intent_id
: Unique identifier for the stream intent.payment_details
: Details or URL for initiating the payment stream.
2. Get StreamIntent Status
Endpoint: GET /stream-intent/:id
Description: Retrieves the status of a stream intent.
Parameters:
id
(required): Unique identifier for the stream intent.
Response:
stream_intent_id
: Unique identifier for the stream intent.status
: Status of the stream intent (e.g., ACTIVE, PAUSED, COMPLETED, FAILED).amount
: The total amount involved in the stream.payment_method
: The payment method used.transaction_id
: The transaction ID from the payment network.
3. Update StreamIntent (Pause/Resume/Stop)
Endpoint: PATCH /stream-intent/:id
Description: Updates the status of a stream intent.
Parameters:
id
(required): Unique identifier for the stream intent.action
(required): The action to perform (PAUSE, RESUME, STOP).
Response:
stream_intent_id
: Unique identifier for the stream intent.status
: Updated status of the stream intent (e.g., PAUSED, RESUMED, STOPPED).
4. WebSocket for Real-Time Updates
WebSocket Endpoint: /ws
Description: Provides real-time updates on the status of the stream intents.
Implementation in Node.js
Below is the Node.js implementation of the streaming money protocol with WebSocket support:
Dependencies
Install the required dependencies:
npm install express axios body-parser crypto mongoose ws node-cron
Server Setup
Create a server with the specified endpoints and WebSocket support:
const express = require('express');
const axios = require('axios');
const bodyParser = require('body-parser');
const crypto = require('crypto');
const mongoose = require('mongoose');
const WebSocket = require('ws');
const cron = require('node-cron');
const app = express();
app.use(bodyParser.json());
// Connect to MongoDB
mongoose.connect('mongodb://localhost:27017/streaming', { useNewUrlParser: true, useUnifiedTopology: true });
// Define schemas
const MerchantSchema = new mongoose.Schema({
id: String,
webhook_url: String,
});
const StreamIntentSchema = new mongoose.Schema({
stream_intent_id: String,
amount_per_interval: Number,
currency: String,
payment_method: String,
merchant_id: String,
customer_id: String,
status: String,
start_time: Date,
last_transfer_time: Date,
total_transferred: Number,
callback_url: String,
});
const Merchant = mongoose.model('Merchant', MerchantSchema);
const StreamIntent = mongoose.model('StreamIntent', StreamIntentSchema);
// Endpoint to create a stream intent
app.post('/stream-intent', async (req, res) => {
const { amount_per_interval, currency, payment_method, merchant_id, customer_id, callback_url } = req.body;
const stream_intent_id = crypto.randomBytes(16).toString('hex');
const start_time = new Date();
const streamIntent = new StreamIntent({
stream_intent_id,
amount_per_interval,
currency,
payment_method,
merchant_id,
customer_id,
status: 'ACTIVE',
start_time,
last_transfer_time: start_time,
total_transferred: 0,
callback_url: callback_url || '',
});
await streamIntent.save();
res.json({ stream_intent_id, amount_per_interval, start_time });
});
// Endpoint to get stream intent status
app.get('/stream-intent/:id', async (req, res) => {
const { id } = req.params;
const streamIntent = await StreamIntent.findOne({ stream_intent_id: id });
if (!streamIntent) {
return res.status(404).send('Stream Intent not found');
}
res.json(streamIntent);
});
// Endpoint to update stream intent (Pause/Resume/Stop)
app.patch('/stream-intent/:id', async (req, res) => {
const { id } = req.params;
const { action } = req.body;
const streamIntent = await StreamIntent.findOne({ stream_intent_id: id });
if (!streamIntent) {
return res.status(404).send('Stream Intent not found');
}
if (action === 'PAUSE') {
streamIntent.status = 'PAUSED';
} else if (action === 'RESUME') {
streamIntent.status = 'ACTIVE';
} else if (action === 'STOP') {
streamIntent.status = 'STOPPED';
} else {
return res.status(400).send('Invalid action');
}
streamIntent.last_update_time = new Date();
await streamIntent.save();
res.json(streamIntent);
});
// Endpoint to handle payment status webhooks
app.post('/webhook', async (req, res) => {
const { transaction_id, stream_intent_id, status, amount } = req.body;
const streamIntent = await StreamIntent.findOne({ stream_intent_id });
if (streamIntent) {
streamIntent.status = status;
streamIntent.transaction_id = transaction_id;
streamIntent.amount_streamed = amount;
streamIntent.last_update_time = new Date();
await streamIntent.save();
if (streamIntent.callback_url) {
notifyMerchant(streamIntent);
}
}
res.status(200).send('Event received');
});
// Notify merchant about transaction status
async function notifyMerchant(streamIntent) {
const { merchant_id, status, amount, transaction_id, callback_url } = streamIntent;
const merchant = await Merchant.findOne({ id: merchant_id });
if (merchant) {
axios.post(callback_url, {
transaction_id,
stream_intent_id: streamIntent.stream_intent_id,
status,
amount,
}).then(response => {
console.log('Merchant notified successfully');
}).catch(error => {
console.log('Failed to notify merchant', error);
});
}
}
// WebSocket setup for real-time updates
const wss = new WebSocket.Server({ server: app.listen(3000, () => console.log('Server running on port 3000')) });
wss.on('connection', (ws) => {
console.log('New WebSocket connection');
ws.on('message', (message) => {
console.log(`Received message: ${message}`);
});
ws.on('close', () => {
console.log('WebSocket connection closed');
});
});
// Scheduled task for streaming payments
cron.schedule('* * * * *', async () => {
const activeStreams = await StreamIntent.find({ status: 'ACTIVE' });
for (const stream of activeStreams) {
const now = new Date();
if (now >= stream.last_transfer_time.getTime() + 60000) {
// Perform transfer
const success = await performTransfer(stream);
if (success) {
stream.total_transferred += stream.amount_per_interval;
stream.last_transfer_time = now;
await stream.save();
// Notify WebSocket clients
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
stream_intent_id: stream.stream_intent_id,
status: stream.status,
total_transferred: stream.total_transferred,
}));
}
});
} else {
console.error(`Failed to transfer for stream: ${stream.stream_intent_id}`);
}
}
}
});
async function performTransfer(stream) {
// Logic to perform the actual transfer
// This could be an API call to a payment gateway, a blockchain transaction, etc.
// For example:
const { merchant_id, customer_id, amount_per_interval, payment_method } = stream;
// Dummy implementation
console.log(`Transferring ${amount_per_interval} ${stream.currency} from ${customer_id} to ${merchant_id}`);
return true; //
Last updated