Hkube

Algorithms

Integrate algorithms into Hkube is quite easy, it's include 3 steps:
1) Push the algorithm to docker hub.
2) Implement connectivity with Hkube.
3) Add the algorithm to Hkube.

Hkube communicate with algorithms via WebSocket because the full-duplex communication support.
All messages between Hkube and algorithm are in JSON format.

Events From Hkube to Algorithm #


These events are sent from Hkube to your algorithm.

JSON #
{
   "command": "<string>", // one of the above
   "data": "<Object>"
}

Event: initialize #

The first event that sent to the algorithm, sent for every task activation.

{
   "command": "initialize",
   "data": {
       "input": ["str", 512, false, {"foo":"bar"}]
   }
}

data include input array, same input as written in the descriptor

Event: start #

Event to start the algorithm task

{
   "command": "start"
}

This event include no data

Event: stop #

Event to abort the running algorithm task

{
   "command": "stop"
}

Event: exit #

Event to exit

{
   "command": "exit"
}

Event: subPipelineStarted #

Event to inform algorithm that sub pipeline (Raw or Stored) has started

{
   "command": "subPipelineStarted",
   "data": {
       "subPipelineId": "<alg-subPipeline-internal-id>"
   }
}

The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events).

Event: subPipelineError #

Event to inform algorithm that sub pipeline (Raw or Stored) has failed.

{
   "command": "subPipelineError",
   "data": {
       "subPipelineId": "<alg-subPipeline-internal-id>"
       "error": "error-message"
   }
}
  • The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events).
  • The "error" property holds the error message text from sub pipeline.

Event: subPipelineDone #

Event to inform algorithm that sub pipeline (Raw or Stored) has completed successfully.

{
   "command": "subPipelineDone",
   "data": {
       "subPipelineId": "<alg-subPipeline-internal-id>"
       "response": ["array", "of", "subpipeline", "output", "values"]
   }
}
  • The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events), as the algorithm may start several sub-pipelines.
  • The "response" property holds the sub pipeline output array.

Event: subPipelineStopped #

Event to inform algorithm that sub pipeline has stopped

{
   "command": "subPiplineStopped",
   "data": {
       "subPipelineId": "<alg-subPipeline-internal-id>",
       "reason": "<stopping-reason>"
   }
}
  • The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events).
  • The "reason" property holds the reason for stopping the sub pipeline.

Events From Algorithm to Hkube #


These events are sent from algorithm to Hkube.

JSON #
{
   "command": "<string>", // one of the above
   "data": "<Any>",
   "error": "<Object>" {
      "code": "<string>",
      "message": "<string>",
      "details": "<string>"
   }
}

Event: initialized #

Response event after initialization complete.

{
   "command": "initialized"
}

Event: started #

Response event after start complete.

{
   "command": "started"
}

Event: stopped #

Response event after stop complete.

{
   "command": "stopped"
}

Event: done #

Response event after the algorithm finish the task.

{
   "command": "done"
}

Event: progress #

If you want to report progress about your algorithm, send this event.

{
   "command": "progress",
   "data": "optional extra details"
}

Event: errorMessage #

If any error occurs in your algorithm, send this event.

{
   "command": "errorMessage",
   "error": {
      "code": "<YOUR_CODE>",
      "message": "<YOUR_MESSAGE>",
      "details": "<YOUR_DETAILS>"
   }
}

Event: startRawSubPipeline #

If you want to start a Raw sub-pipeline from your algorithm, use this event.

{
   "command": "startRawSubPipeline",
   "data": {
        "subPipeline": {
            "name": "<sub-pipeline-name>",
            "nodes": [
                {
                    "nodeName": "<first-node-name>",
                    "algorithmName": "<alg-name>",
                    "input":    ["@flowInput.data"]
                }
            ],
            "options": {
            },
            "webhooks": {
            },
            "flowInput": {
               "data": ["array", "of", "subpipeline", "input", "values"]
            }
        },
        "subPipelineId": "<alg-subPipeline-internal-id>",
   }
}
  • The "subPipeline" object gives a standard raw full description of the requested sub pipeline.
  • The "input" field value of the first node should be ["@flowInput.data"]
  • This input is taken from "flowInput", where you plant your subpipeline input in the "data" field.
  • The "subPipelineId" property holds sub pipeline internal Id in algorithm (as the algorithm may start several sub-pipelines).

Event: startStoredSubPipeline #

If you want to start a Stored sub-pipeline from your algorithm, use this event.

{
   "command": "startStoredSubPipeline",
   "data": {
        "subPipeline": {
            "name": "<stored-sub-pipeline-name>",
            "flowInput": {
               "data": ["array", "of", "subpipeline", "input", "values"]
            }
        },
        "subPipelineId": "<alg-subPipeline-internal-id>",
   }
}
  • The "subPipeline" object gives a standard stored description of the requested sub pipeline (name and optionally flowInput, options, webhooks).
  • This input is taken from "flowInput", where you plant your subpipeline input in the "data" field.
  • The "subPipelineId" property holds sub pipeline internal Id in algorithm (as the algorithm may start several sub-pipelines).

Event: stopSubPipeline #

If you want to stop a sub-pipeline (Raw or Stored) from your algorithm, use this event.

{
   "command": "stopSubPipeline",
   "data": {
        "subPipelineId": "<alg-subPipeline-internal-id>",
        "reason": "<reason>",
   }
}
  • The "subPipelineId" property holds sub pipeline internal Id in algorithm.
  • The "reason" property enables to put a textual reason for stopping the subpipeline.

How To Implement

Implement #


Hkube communicate with your algorithm via WebSocket (native WebSocket or socketio).
This tutorial explain how to create a websocket client that works with Hkube. You can implement the websocket client in any language. (PR are welcomed)

Connect #

The first thing your algorithm should do is to create a websocket client that listens to: ws://localhost:3000.


  // create ws client and listen to ws://localhost:3000
  this._socket = new WebSocket(this._url);
  this._socket.on('open', () => {
    log.debug("connected");
  });
  

Handle Events #

Here we are registering to events from Hkube.
Each event has a specific handler, as described below.


  this._socket.on('message', (message) => {
    const payload = JSON.parse(message);
    switch (payload.command) {
      case messages.incoming.initialize:
        this._initialize(payload);
        break;
      case messages.incoming.start:
        this._start();
        break;
      case messages.incoming.stop:
        this._stop();
        break;
      default:
        log.debug("unknown message payload.command");
      } 
    });

initialize #

The initialize event is the first event that Hkube send to your algorithm.
The payload of this event includes the pipeline data and the input for your algorithm.
You need to store the input in local variable for later use.

same input as written in the descriptor


  _initialize(payload) {
    this._input = payload.data.input;  // store the input
    this._send(messages.outgoing.initialized);  // send ack event
  }

start #

The start event is the second event that Hkube send to your algorithm.
As you can see, at the first step of this handler you need to tell Hkube that your algorithm has started.
Then you let the algorithm do it's work and finally you send the done event with the algorithm result.


  _start() {
    this._send(messages.outgoing.started);   // send ack event
    try {
        // your code goes here...
        const output = await myAlgorithm.process(this._input);  // use the input
        // your code goes here...

        // send response
        this._send(messages.outgoing.done, {
            data: output
        });
    }
    catch (error) {
        this._onError(error); // send error event
    }
}

stop #

Hkube will send this event to your algorithm only if stop request was made by Hkube users.


  _stop() {
    // your code goes here...
    // e.g. myAlgorithm.stop();
    // your code goes here...
    
    this._send(messages.outgoing.stopped);  // send ack event
  }

Reconnect #

Web Sockets are not auto reconnect, so it's important that you will handle connection lose.


    // connect handler        
    _connect() {
        this._socket = new WebSocket(this._url);
        this._socket.on('close', (code, reason) => {
            switch (code) {
                case 1000:
                    log.debug("socket normal closed");
                    break;
                default:
                    this._reconnect();
                    break;
            }
        });
        this._socket.on('error', (e) => {
            switch (e.code) {
                case 'ECONNREFUSED':
                    this._reconnect();
                    break;
                default:
                    log.error("error");
                    break;
            }
        });
    }
    
    // reconnect handler 
    _reconnect() {
      log.debug("socket reconnecting");
      this._socket.removeAllListeners();
      setTimeout(() => {
          this._connect();
      }, this._reconnectInterval);
    }
}

Handle Errors #

It's highly recommended that you will catch any error in your algorithm and send it to Hkube.


  _onError(error) {
    this._send(messages.outgoing.error, {
      error: {
        code: 'Failed',
        message: error.message || error,
        details: error.stackTrace
      }
    });
  }
  

Send Event #

This is a simple handler for send response back to Hkube.


  _send(command, data, error) {
    try {
      this._socket.send(JSON.stringify({ command, data, error }));
    }
    catch (e) {
        log.error(e);
      }
  }
  
Next →Pipelines