LEaRN The Lafayette Engagement and Research Network

Creating Observations from Python

Original CGI Document

With JWT authentication

import requests
import datetime

# Only for testing to demonstrate re-authentication
import time

# Per-device ID and key.
JWT_ID = "2694b9e1-ce59-4fd5-b95e-aa1c780e8158"
JWT_KEY = "ecacdce7-7374-408b-997b-5877bf9e37c3"

FOI_ID = "c81a4920-100b-11e7-987b-9b2f50364984"
DS_ID = "1e34fad0-100c-11e7-987b-9b2f50364984"

#AUTH_TTL = datetime.timedelta(minutes=15)
AUTH_TTL = datetime.timedelta(seconds=5)

URL = "https://sensorthings.southcentralus.cloudapp.azure.com/device/api/v1.0/Observations"
URL_AUTH = "https://sensorthings.southcentralus.cloudapp.azure.com/device/api/auth/login"

JSON_TEMPLATE = '''{{"FeatureOfInterest":{{"@iot.id":"{featureOfInterestId}"}},
  "Datastream":{{"@iot.id":"{datastreamId}"}},
  "phenomenonTime":"{phenomenonTime}",
  "parameters":{{{parametersStr}}},
  "result":"{result}"
}}'''

AUTH_TEMPLATE = '''{{"id":"{id}","key":"{key}"}}'''


def jwt_authenticate(token=(None, None)):
    new_token = token
    auth_required = False

    # Figure out if authentication is required, that is: (1) if we have never authenticated (token_timestamp is None);
    #   or (2) token_timestamp is later than or equal to the current time + AUTH_TTL
    token_timestamp = token[1]
    if token_timestamp is None:
        print("Auth token is null, authenticating ...")
        auth_required = True
    else:
        token_expired_after = token_timestamp + AUTH_TTL
        if datetime.datetime.utcnow() >= token_expired_after:
            print("Auth token expired, re-authenticating ...")
            auth_required = True

    if auth_required:
        json = AUTH_TEMPLATE.format(id=JWT_ID, key=JWT_KEY)
        headers = {'Content-Type': 'application/json'}
        r = requests.post(URL_AUTH, headers=headers, data=json)
        print("Auth status code was {0}".format(r.status_code))
        if r.status_code != 200:
            print("ERROR: Authentication failed")
            new_token = (None, None)
        else:
            new_token = (r.json()["token"], datetime.datetime.utcnow())

    return new_token


def post_observation(token,
                     featureOfInterestId,
                     datastreamId,
                     phenomenonTime,
                     result,
                     parameters={}):
    parametersStr = ",".join(['"{k}":"{v}"'.format(k=e[0], v=e[1]) for e in parameters.items()])
    json = JSON_TEMPLATE.format(featureOfInterestId=featureOfInterestId,
                                datastreamId=datastreamId,
                                phenomenonTime=phenomenonTime,
                                result=result,
                                parametersStr=parametersStr)
    print("Posting new data {0}".format(json))
    headers = {'Content-Type': 'application/json',
               'Authorization': "Bearer {token}".format(token=token[0])}
    r = requests.post(URL, headers=headers, data=json)
    print("Status code was {0}".format(r.status_code))
    location = r.headers['Location']
    print("Location: {0}".format(location))

def main():
    jwt_token = (None, None)

    # Example showing first-time authentication
    jwt_token = jwt_authenticate(jwt_token)
    if jwt_token[0] is None:
        print("Unable to authenticate using JWT")
    else:
        post_observation(token=jwt_token,
                         featureOfInterestId=FOI_ID,
                         datastreamId=DS_ID,
                         phenomenonTime=datetime.datetime.now().isoformat(),
                         result="42.23",
                         parameters={"voltage": "23.42", "resistance":"0.134", "DN":"0.543"})

    # Example showing re-use of non-expired auth token
    jwt_token = jwt_authenticate(jwt_token)
    if jwt_token[0] is None:
        print("Unable to authenticate using JWT")
    else:
        post_observation(token=jwt_token,
                         featureOfInterestId=FOI_ID,
                         datastreamId=DS_ID,
                         phenomenonTime=datetime.datetime.now().isoformat(),
                         result="666.666",
                         parameters={"voltage": "2323.3232", "resistance": "0.020303", "DN": "0.42424242"})

    # Example showing expiration of auth token and re-authentication
    time.sleep(10)
    jwt_token = jwt_authenticate(jwt_token)
    if jwt_token[0] is None:
        print("Unable to authenticate using JWT")
    else:
        post_observation(token=jwt_token,
                         featureOfInterestId=FOI_ID,
                         datastreamId=DS_ID,
                         phenomenonTime=datetime.datetime.now().isoformat(),
                         result="23.42",
                         parameters={"voltage": "42.23", "resistance": "0.431", "DN": "0.345"})

if __name__ == '__main__':
    main()

 

 

Initial un-authenticated access

import requests
import datetime

URL = "http://sensorthings.southcentralus.cloudapp.azure.com:8080/device/api/v1.0/Observations"

JSON_TEMPLATE = '''{{"FeatureOfInterest":{{"@iot.id":"{featureOfInterestId}"}},
  "Datastream":{{"@iot.id":"{datastreamId}"}},
  "phenomenonTime":"{phenomenonTime}",
  "parameters":{{{parametersStr}}},
  "result":"{result}"
}}'''

def post_observation(featureOfInterestId,
                     datastreamId,
                     phenomenonTime,
                     result,
                     parameters={}):
    parametersStr = ",".join(['"{k}":"{v}"'.format(k=e[0], v=e[1]) for e in parameters.items()])
    json = JSON_TEMPLATE.format(featureOfInterestId=featureOfInterestId,
                                datastreamId=datastreamId,
                                phenomenonTime=phenomenonTime,
                                result=result,
                                parametersStr=parametersStr)
    print("Posting new data {0}".format(json))
    headers = {'Content-Type': 'application/json'}
    r = requests.post(URL, headers=headers, data=json)
    print("Status code was {0}".format(r.status_code))
    location = r.headers['Location']
    print("Location: {0}".format(location))

def main():
    post_observation(featureOfInterestId="ad5e2ef0-0f5f-11e7-af39-1b96aae3c355",
                     datastreamId="3ba4c2a0-0f60-11e7-af39-1b96aae3c355",
                     phenomenonTime=datetime.datetime.now().isoformat(),
                     result="42.23",
                     parameters={"voltage": "23.42", "resistance":"0.134", "DN":"0.543"})

if __name__ == '__main__':
    main()