Stream processor

Stream processor flow

Example 1 - Delimited

Endpoint: /stream

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON object or array of objects per below schema

{
    "data": String,
    "dataformat": {
        "format": "delimited",
        "fields": Object,
    },
    "delimiter": Character,
    "quotechar": Character,
    "timeformat": String,
    "timezone": String,
    "batch_id": String
}
  • data: Delimited file content, UTF-8 encoded

  • format: Set to the string “delimited”

  • fields: A json object with below keys:
    1. timestamp: iso-8601 string or unix epoch

    2. longitude: decimal in WGS84 / EPSG 4326 datum

    3. latitude: decimal in WGS84 / EPSG 4326 datum

    4. device_id: This is used to group timeseries records. Device id is required only if your data contains GPS records from two or more devices. Geo filter functions view data as a continuous timeseries to detect anomalies. In the absence of device id, data from multiple devices will be grouped into one series causing erroneous filtering.

    Values for the keys are specified as an array [column number, width]. Column numbers are 0 indexed. e.g.;

    {
        timestamp: [0, 99], //timestamp is the first field. Width is ignored for delimited files.
        longitude: [2, 99],  //longitude is field 3
        latitude: [3, 99],   // latitude is field 4
        device_id: [9, 99]   // device id is field 10
    }
    
    // Here additional fields in the file - fields 5 to 9 - will be ignored.
    
  • delimiter: Delimiter character. e.g., “,”

  • quotechar: Quote char for delimited files. Default double quote.

  • timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively

  • timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list

  • batch_id: Optional. Used to tag/query records submitted in a batch.

Download delimited example delimited.csv

import json
import requests

headers = {'Content-Type': 'application/json'}

with open('delimited.csv', mode='rt', encoding='utf-8') as f:
    data = f.read()

payload = {
    "data": data,
    "dataformat": {
        "format": "delimited",
        "fields": {
            "timestamp": [1, 99],
            "longitude": [4, 99],
            "latitude": [3, 99],
            "device_id": [2, 99]
        }
    },
    "delimiter": ",",
    "quotechar": '"',
    "timeformat": "iso",
    "batch_id": "batch-1"
}

api_url = 'http://<server public ip>/stream'

r = requests.post(api_url, data=json.dumps(payload), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
   "queue_id": "9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59",
   "status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59
  • Sucessful POST returns HTTP 202 Accepted as response code.

  • Response body text or location header gives you the queue id.

  • Queued message is processed after a delay of 15 mins or so. See Debounce

  • You can check status of message in queue using GET /stream/queue/<queue_id>

Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded. You can then download the cleaned records as geosptial timeeries data using any of the API methods below:

1.Processed stream as a geospatial timeseries json using GET /stream

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']

params = {'batch_id': batch_id}

r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_delimited.json', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response stream_delimited.json

2.Processed stream as a csv file using GET /stream/csv

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']

params = {'batch_id': batch_id}

r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_delimited.csv', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response stream_delimited.csv

3.Processed stream segmented into trip segments using GET /trips

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']

params = {'batch_id': batch_id, 'embed': 'address' }

r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_delimited.json', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response trips_delimited.json

Example 2 - Fixed width

Endpoint: /stream

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON object or array of objects per below schema

{
    "data": String,
    "dataformat": {
        "format": "fixed",
        "fields": Object,
    },
    "timeformat": String,
    "timezone": String,
    "batch_id": String
}
  • data: Fixed width file content, UTF-8 encoded

  • format: Set to the string “fixed”

  • fields: A json object with below keys:
    1. timestamp: iso-8601 string or unix epoch

    2. longitude: decimal in WGS84 / EPSG 4326 datum

    3. latitude: decimal in WGS84 / EPSG 4326 datum

    4. device_id: This is used to group timeseries records. Device id is required only if your data contains GPS records from two or more devices. Geo filter functions view data as a continuous timeseries to detect anomalies. In the absence of device id, data from multiple devices will be grouped into one series causing erroneous filtering.

    Values for the keys are specified as an array [start column number, column width]. Here column number is character position in the fixed width file with first character starting at 0.

    {
        timestamp: [20, 120],  // timestamp field starts at character or column 20, width 120 chars.
        longitude: [410, 30],
        latitude: [300, 30],
        device_id: [200, 20]
    }
    
  • timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively

  • timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list

  • batch_id: Optional. Used to tag/query records submitted in a batch.

Download fixed width example fixed.txt

import json
import requests

headers = {'Content-Type': 'application/json'}

with open('fixed.txt', mode='rt', encoding='utf-8') as f:
    data = f.read()

payload = {
    "data": data,
    "dataformat": {
        "format": "fixed",
        "fields": {
            "timestamp": [20, 120],
            "longitude": [410, 30],
            "latitude": [300, 30],
            "device_id": [200, 20]
        }
    },
    "timeformat": "iso",
    "batch_id": "batch-fixed-2"
}

api_url = 'http://<server public ip>/stream'

r = requests.post(api_url, data=json.dumps(payload), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
   "queue_id": "68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034",
   "status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034
  • Sucessful POST returns HTTP 202 Accepted as response code.

  • Response body text or location header gives you the queue id.

  • Queued message is processed after a delay of 15 mins or so. See Debounce

  • You can check status of message in queue using GET /stream/queue/<queue_id>

  • Message is fully processed when status changes to processed

Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.

When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:

1.Processed stream as a geospatial timeseries json using GET /stream

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']

params = {'batch_id': batch_id}

r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_fixed.json', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response stream_fixed.json

2.Processed stream as a csv file using GET /stream/csv

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']

params = {'batch_id': batch_id}

r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_fixed.csv', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response stream_fixed.csv

3.Processed stream segmented into trip segments using GET /trips

import json
import requests

batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']

params = {'batch_id': batch_id, 'embed': 'address' }

r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_fixed.json', 'wt', encoding='utf-8') as f:
    f.write(r.text)

See the response trips_fixed.json

Example 3 - JSON Basic

Endpoint: /stream/json_basic

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON object or array of objects per below schema

{
    "latitude": Double,       // required
    "longitude": Double,      // required
    "altitude": Double,
    "accuracy": Double,
    "speed": Double,
    "heading": Double,
    "activity_type": String,
    "activity_confidence": {
      "type": Double,
      "minimum": 0,
      "maximum": 100
    },
    "timestamp": ISO-8601 UTC, // required
    "device_id": String
}

Download JSON data example basic.json

import json
import requests

headers = {'Content-Type': 'application/json'}

with open('basic.json', 'rt') as f:
    data = json.load(f)

api_url = 'http://<server public ip>/stream/json_basic'

r = requests.post(api_url, data=json.dumps(data), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
   "queue_id": "0b32373b-09cb-4a09-b7be-a1c8d9c4d75b_31f7f57c-c324-4739-8703-20f544efc213",
   "status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/0b32373b-09cb-4a09-b7be-a1c8d9c4d75b_31f7f57c-c324-4739-8703-20f544efc213
  • Sucessful POST returns HTTP 202 Accepted as response code.

  • Response body text or location header gives you the queue id.

  • Queued message is processed after a delay of 15 mins or so. See Debounce

  • You can check status of message in queue using GET /stream/queue/<queue_id>

  • Message is fully processed when status changes to processed

Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.

When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:

1.Processed stream as a geospatial timeseries json using GET /stream

import json
import requests

params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284'} #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_basic.json', 'wt') as f:
    f.write(r.text)

See the response stream_basic.json

2.Processed stream as a csv file using GET /stream/csv

import json
import requests

params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284'} #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_basic.csv', 'wt') as f:
    f.write(r.text)

See the response stream_basic.csv

3.Processed stream segmented into trip segments using GET /trips

import json
import requests

params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284', 'embed': 'address' } #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_basic.json', 'wt') as f:
    f.write(r.text)

See the response trips_basic.json

Example 4 - JSON Rnbg

React Native Background Geolocation by Transistorsoft - Rnbg is a popular battery efficient location tracking module for IOS & Android. You can configure Rnbg SDK or demo app to post location data continuously to geofilter stream endpoint. When building continuous location tracking apps, use Rnbg’s batch mode to group multiple records into a single HTTP request.

Endpoint: /stream/json_rnbg

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON ->

   {
   "imei": [String],   // if given, geofilter will use this as the device ID.
   "location": {       // this is an array of location objects in rnbg batch mode
       "coords": {
           "latitude":   Double,
           "longitude":  Double,
           "accuracy":   Double,
           "speed":      Double,
           "heading":    Double,
           "altitude":   Double
       },
       "extras": {   // <-- optional meta-data
           "foo": "bar"
       },
       "activity": {
           "type": still|on_foot|walking|running|in_vehicle|on_bicycle|unknown,
           "confidence": [0-100%]
       },
       "geofence": {  // <-- Present only if a geofence was triggered at this location
           "identifier": String,
           "action": String ENTER|EXIT
       },
       "battery": {
           "level": Double,
           "is_charging": Boolean
       },
       "timestamp": ISO-8601 UTC, // eg:  "2015-05-05T04:31:54.123Z"
       "uuid":      String,       // <-- Universally unique identifier
       "event"      String,       // <-- motionchange|geofence|heartbeat
       "is_moving": Boolean,      // <-- The motion-state when recorded.
       "odometer": Double/meters
   }
}

Download JSON data example rnbg.json

import json
import requests

headers = {'Content-Type': 'application/json'}

with open('rnbg.json', 'rt') as f:
    data = json.load(f)

api_url = 'http://<server public ip>/stream/json_rnbg'

r = requests.post(api_url, data=json.dumps(data), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
   "queue_id": "b119738f-a14b-4a39-b47d-0a19e4ed2ff9_55ae48cd-bdf8-4c83-8e8d-530dbf60ead9",
   "status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/b119738f-a14b-4a39-b47d-0a19e4ed2ff9_55ae48cd-bdf8-4c83-8e8d-530dbf60ead9
  • Sucessful POST returns HTTP 202 Accepted as response code.

  • Response body text or location header gives you the queue id.

  • Queued message is processed after a delay of 15 mins or so. See Debounce

  • You can check status of message in queue using GET /stream/queue/<queue_id>

  • Message is fully processed when status changes to processed

Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.

When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:

1.Processed stream as a geospatial timeseries json using GET /stream

import json
import requests

params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733'} #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_rnbg.json', 'wt') as f:
    f.write(r.text)

See the response stream.json

2.Processed stream as a csv file using GET /stream/csv

import json
import requests

params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733'} #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_rnbg.csv', 'wt') as f:
    f.write(r.text)

See the response stream.csv

3.Processed stream segmented into trip segments using GET /trips

import json
import requests

params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733', 'embed': 'address' } #  batch_id from `GET /stream/queue/<queue_id>`_

r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_rnbg.json', 'wt') as f:
    f.write(r.text)

See the response trips.json

API methods

POST /stream

Endpoint: /stream

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON object or array of objects per below schema

{
    "data": String,
    "dataformat": {
        "format": String,
        "fields": {
            "timestamp": Array[Integer, Integer],
            "longitude": Array[Integer, Integer],
            "latitude": Array[Integer, Integer],
            "device_id": Array[Integer, Integer]
        },
    },
    "delimiter": Character,
    "quotechar": Character,
    "timeformat": String,
    "timezone": String,
    "batch_id": String
}
  • data: File content, UTF-8 encoded

  • format: String. Allowed values are:
    • “delimited”: When data contains Delimited file content.

    • “fixed”: Fixed width file.

    • “json_basic”: Data is in JSON Basic format

    • “json_rnbg”: Data is in JSON Rnbg format.

  • fields: A json object with below keys; required only for Delimited or Fixed format data.
    1. timestamp: [column, width]

    2. longitude: [column, width]

    3. latitude: [column, width]

    4. device_id: [column, width]

    • Values for the keys are specified as an array [column number, width].

    • Column numbers start at 0.

    // Fixed width example
    // column index starts at 0.
    {
        timestamp: [20, 120], //timestamp starts at column 20 , with a width of 120 chars.
        longitude: [410, 30], //longitude starts at column 410, with a width of 30 chars.
        latitude: [300, 30],  //latitude  starts at column 300, with a width of 30 chars.
        device_id: [200, 20]  //device id starts at column 200, with a width of 20 chars.
    }
    
    // Delimited example
    {
        timestamp: [0, 99], //timestamp is in column 0, width is ignored.
        longitude: [4, 99], //longitude is in column 4, width is ignored.
        latitude: [3, 99],  //latitude  is in column 3, width is ignored.
        device_id: [2, 99]  //device id is in column 2, width is ignored.
    }
    
  • delimiter: Delimiter character. e.g., “,”

  • quotechar: Quote char for delimited files. Default double quote.

  • timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively

  • timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list

  • batch_id: Optional. Used to tag/query records submitted in a batch.

POST response: HTTP 202 Accepted. Response body gives queue_id.

{
    "queue_id": "<queue_id>",
    "status": 202
}

See example post requests for delimited and fixed formats.

POST /stream/json_basic

This is a helper method for posting to /stream with JSON Basic format. See the example

POST /stream/json_rnbg

This is a helper method for posting to /stream with JSON Rnbg format. See the example

GET /stream/queue/<queue_id>

Check status of messages in the queue.

processed is the final status before you can download clean data.

Example response.

{
    "batch_id": "PXNP2l7pUBs_2020-09-10T12-48-17.634733",
    "end_time": "2020-09-10T12:48:21.420224+00:00",
    "records_processed": 3352,
    "start_time": "2020-09-10T12:48:17.634432+00:00",
    "status": "parsed"
}

GET /stream

Retrieve processed, cleaned data as a geospatial time series sorted on timestamp.

Parameters allowed in the HTTP GET request.

  1. include_filtered: Default false. Set to true to include filtered out records(outliers, static navigation error etc.) to be included in the output. The output file will then contain below boolean fields:
    • filter_time: Invalid timestamp, e.g., a timestamp in the future due to clock error, satellite fix issues.

    • filter_accuracy: GPS accuracy less than 250 meteres. This filter is applied when accuracy is supplied in the input data when using JSON Rnbg format.

    • filter_static: Static navigation errors.

    • filter_jumps: Jumps caused by GPS signal reflection, satellite fix issues.

  2. start_time: ISO 8601 timestring. Retrieve records with timestamp >= start_time.

  3. end_time: ISO 8601 timestring. Retrieve records with timestamp <= end_time.

  4. batch_id: Filter records based on batch_id, retrieved uding GET /stream/queue/<queue_id>

  5. device_id: Filter records based on device_id, if supplied in the input data.

  6. activity: Filter records based on activity type. Used with JSON Rnbg format.

  7. trip_id: Filter records based on trip_id.

HTTP response: HTTP 200.

JSON data with below fields:

Field

Description

id

global record id

row_id

record id within a batch

timestamp

iso-8601 timestamp(UTC)

tz

original timezone

batch_id

trip_id

device_id

lattiude

decimal, epsg 4326

longitude

decimal, epsg 4326

altitude

heading

speed

activity

activity_confidence

accuracy

filter_time

true if time in future

filter_accuracy

true if accuracy < 250m

filter_static

true if static error

filter_jumps

true if GPS jumps

address

OpenStreetMap GeoJSON

GET /stream/csv

Helper method to retrieve GET /stream JSON as a flattened csv file.

GET /trips

Retrieve processed geospatial timeseries data grouped into journey or trip records with trip start and end times, trip distance and trip route polyline/geojson.

Parameters allowed in the HTTP GET request:
  • trip_id: Retrieve only specified trip_id.

  • device_id: Retrieve trip records for a specific device.

  • batch_id: Retrieve trip records for a batch_id.

  • emebd: Specify emebd=address to return OpenStreetMap address records for trip start and end points.

See an example HTTP request for trips

HTTP response: HTTP 200.

An array of JSON objects with below fields:

Field

Description

id

trip id

device_id

start_time

iso-8601 timestamp(UTC)

end_time

iso-8601 timestamp(UTC)

timezone_offset

original timezone

metres

trips distance in meteres

route_gejson

route as a GeoJSON LineString

route_polyline

route as a Google polyline

address

OSM GeoJSON start/end address

POST /tasks/schedules

Update task schedule and debounce delay.

Endpoint: /tasks/schedules

HTTP method: POST

HTTP Header:

Content-Type: application/json

HTTP POST body: JSON object or array of objects per below schema

{
    'debounce': Integer,    // how many seconds to wait since the last data upload.
    'crontab': String       // a valid linux crontab entry
}

Example POST body

{
    'debounce': 60,            // debounce delay of 60 seconds, default value.
    'crontab': '*/2 * * * *'   // run every 2 mins, default value.
}

GET /tasks/schedules

Retrieve scheduled background task details. An example response is below:

{
    "args": "()",
    "coalesce": "True",         // tasks are merged into one when their schedules overlap
    "id": "cleaner_master",     // master task that initiates filter/clean tasks.
    "kwargs": "{'enqueue': True, 'debounce': 60}",     // debounce: time in seconds to wait since the last data upload before kicking off tasks.
    "max_instances": "1",
    "misfire_grace_time": "900",
    "name": "cleaner_master",
    "next_run_time": "2020-09-15 23:30:00+00:00",       // next scheduled run time. Default: every 2 mins.
    "trigger": "cron[month='*', day='*', day_of_week='*', hour='*', minute='*/2']"   // crontab entry for the task. Default: every 2 mins.
}

Debounce

Filter functions work best when applied to a sufficiently large batch of geospatial data. For example, trip segmentation works by iterating through the dataset and identifying gaps(or segments) between groups of records. If segmentation task runs while you are uploading a large batch of records, the task will fail to segment the dataset accurately.

To reduce errors, the queue processor task waits for a gap in data uploads before applying filters. This debounce delay is set to 5 mins in the current version.

When you have many files to process, continue to upload them using scp/sftp to file processor inbox. Or use HTTP POST. Filter functions will be triggerred when there is atleast 5 mins idle time. This is usually 5 mins after the last upload. It may take another 15 - 30 mins or longer to completely process all records depedending on the number of messages to be processed.

To view the settings for debounce delay and task schedules use GET /tasks/schedules.

To update debounce delay and task schedule use POST /tasks/schedules