Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A2 gateway rabbitmq #49

Merged
merged 4 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Orion



Release branch names:
- `a1-plot-graph-release`
- `a1-data-ingestor-release`
- `a1-registry-release`
- `a1-gateway-release`
- `a1-ui-release`

---
#### Please follow the order of execution of microservices as mentioned in this document
---

## Plot Graph Microservice

Following are the steps for running the microservice:

### Using Docker

For running it with Docker, please pull the repository and run the following commands:
```
git clone https://github.com/airavata-courses/orion
cd orion
git checkout a1-plot-graph-release
cd plot-weather-microservice
```

```
docker build . -t plot
docker run -d --name adsA1-plot -p 8000:8000 plot
```

**NOTE:** This microservice runs at PORT number **`8000`**

### Build from Source


Please pull the repository and run the following commands:
```
git clone https://github.com/airavata-courses/orion
cd orion
git checkout a1-data-ingestor-release
cd plot-weather-microservice
```

```
python3 manage.py runserver
```

## Data Ingestor Microservice

Following are the steps for running the microservice:

### Build from Source


Please pull the repository and run the following commands:
```
git clone https://github.com/airavata-courses/orion
cd orion
git checkout a1-data-ingestor-release
cd weather-data-ingestor-microservice
```

```
npm install
npm start
```

**NOTE:** This microservice runs at PORT number **`3001`**



## Gateway Microservice

Following are the steps for running the microservice:

### Build from Source

Please pull the repository and run the following commands:
```
git clone https://github.com/airavata-courses/orion
cd orion
git checkout a1-gateway-release
cd gateway-service
```

```
npm install
npm start
```

**NOTE:** This microservice runs at PORT number **`3000`**




## Registry Microservice

Since this microservice has external dependencies (MySQL), please refer its standalone README [here](https://github.com/airavata-courses/orion/blob/a1-registry-release/registry-service/README.md)

**NOTE:** This microservice runs at PORT number **`8091`**




## UI Microservice

Following are the steps for running the microservice:

### Build from Source

Please pull the repository and run the following commands:
```
git clone https://github.com/airavata-courses/orion
cd orion
git checkout a1-ui-release

```

```
npm install
npm start
```

**NOTE:** This microservice runs at PORT number **`3002`**
11 changes: 11 additions & 0 deletions gateway-service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM node:16
# Create app directory
WORKDIR ./
# Install app dependencies
COPY package*.json ./

RUN npm install

COPY ./ ./

CMD [ "npm", "run", "start" ]
204 changes: 129 additions & 75 deletions gateway-service/app.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@

var createError = require('http-errors');
var express = require('express');
var path = require('path');
var cookieParser = require('cookie-parser');
var logger = require('morgan');
var axios = require('axios');
var cors = require('cors')

var amqp = require('amqplib/callback_api');

let respList = [];
var connectionVar;
var channelVar;

amqp.connect('amqp://orionRabbit', ampqConnectionInit);

const http = require("http");
const WebSocket = require("ws");
const serverSocket = require("./socket.js");
Expand All @@ -16,14 +25,18 @@ app.use(cors({ credentials: true,
origin: "http://localhost:3002",
}));

var index = require('./routes/index');
var apiHelper = require('./routes/apiHelper');
// var index = require('./routes/index');
var registry = require('./routes/registry');
// const { syncBuiltinESMExports } = require('module');
// const { channel } = require('diagnostics_channel');

// var apiHelper = require('./routes/apiHelper');
// view engine setup
app.use(express.json());
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'jade');
app.use(index);

const server = http.createServer(app);
// app.use("/index", index);
app.use("/registry", registry);

const wss = new WebSocket.Server({ server });
serverSocket.initWS(wss);
Expand All @@ -34,73 +47,6 @@ app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));

app.get('/registry/status', function(req, resp, next) {
console.log('This just called', req.body);
axios
.get('http://localhost:8091/registry/status')
.then(res => {
resp.send(res.data)
})
});


app.post('/registry/status', function(req, resp, next) {
console.log('Post Status just called', req.body);
axios
.post('http://localhost:8091/registry/status', req.body)
.then( res => {
resp.send(res.data)
})
.catch(err => console.log(err))
});

app.post('/orionweather', function(req, resp, next) {
// resp.header("Access-Control-Allow-Origin", "*");

var entryId = -1;
// From UI to Registry
axios
.post('http://localhost:8091/registry/newRequest',req.body)
.then(responseFromReg => {
entryId = responseFromReg.data;
});
// From Gateway to Ingestor
axios
.post('http://localhost:3001/api/uri/images/',req.body)
.then(ingestorResponse => {
let ingestorUri = ingestorResponse.data;
// From Ingestor to Registry
axios
.post('http://localhost:8091/registry/ingestorResponse',{entryId, ingestorUri})
.then(responseFromRegi => {
});
if(ingestorResponse.status == 200) {
// From Gateway to Plot
axios
.post('http://localhost:8000/plotWeather/plotgraph',ingestorResponse.data)
.then(responseFromPlotter => {

// From Plotter to Registry
let plotData = responseFromPlotter.data;
axios
.post('http://localhost:8091/registry/plotResponse',{entryId, plotData})
.then(responseFromRegistry => {
});
resp.send(responseFromPlotter.data);
})
} else {
resp.send().status(400);
// io.emit("gateway", {status:{dataingestor:false, plot:false}});
// // axios.put('http://localhost:3001/status').status(400);
}
});
});

// catch 404 and forward to error handler
app.use(function(req, res, next) {
next(createError(404));
});

// error handler
app.use(function(err, req, res, next) {
// set locals, only providing error in development
Expand All @@ -111,7 +57,115 @@ app.use(function(err, req, res, next) {
res.status(err.status || 500);
res.render('error');
});
var port = process.env.PORT || '4000';
var correlationIds = [];
app.listen(port, () => console.log(`Listening on port ${port}`));

app.post('/orionweather', postHandler);


// catch 404 and forward to error handler
app.use(function(req, res, next) {
next(createError(404));
});

server.listen(port, () => console.log(`Listening on port ${port}`));
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}

function ampqConnectionInit(error0, connection) {
if (error0) {
throw error0;
}
console.log("Connection to Rabbit MQ successful");
connectionVar = connection;
connectionVar.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
console.log("Channel created for Rabbit MQ");
channelVar = channel;
});
}

function ampqConnectionHandler(error0, connection) {
if (error0) {
respBody = {"error":"Could not create connection"};
console.log(respBody);
throw error0;
}
console.log("We have connection now",connection);
connectionVar = connection;
}

function ampqChannelHandler(error1, channel) {
if (error1) {
respBody = {"error":"Could not create channel"};
console.log(respBody);
// resp.json(respBody).status(500);
throw error1;
}
console.log("Channel Created now");
channelVar = channel;
}

async function postHandler(req, resp, next) {
let respBody;
// resp.header("Access-Control-Allow-Origin", "*");
console.log("Received POST request at orionweather");
console.log(req.body);
if(!connectionVar) {
amqp.connect('amqp://orionRabbit', ampqConnectionHandler);
}
if(!channelVar) {
connectionVar.createChannel(ampqChannelHandler);
}

channelVar.assertQueue('gateway_rx', {
exclusive: false
}, function(error2, q) {
if (error2) {
respBody = {"error":"Could not connect to queue to send message"};
console.log(respBody);
throw error2;
}
console.log("gateway_rx channel association successful");
var correlationId = generateUuid();
correlationIds.push(correlationId);
let stringData = JSON.stringify(req.body);
console.log("This is how it's getting sent: ",stringData);
channelVar.sendToQueue('ingestor_rx',
Buffer.from(stringData),{
correlationId,
replyTo: "gateway_rx" });
});
var nLog;
channelVar.consume("gateway_rx", function(msg) {
let correlationRecv = msg.properties.correlationId;
if (correlationIds.indexOf(correlationRecv)>-1) {
correlationIds.filter(function(value, index, arr){
return value != correlationRecv;
});
nLog = JSON.parse(msg.content.toString());
console.log(' [.] Received from queue: ', nLog);
respList.push(nLog);
}
}, {
noAck: false
});
await sleep(2000);
let val = respList.pop();
console.log(val);
resp.json(val).status(200).send();
return;
}

function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

module.exports = app;
module.exports = app;
1 change: 1 addition & 0 deletions gateway-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"start": "node ./bin/www"
},
"dependencies": {
"amqplib": "^0.8.0",
"axios": "^0.25.0",
"body-parser": "^1.19.1",
"cookie-parser": "~1.4.4",
Expand Down
Loading