Continuous Deployment Traffic Light - Software


This article concludes the Continuous Deployment Traffic Light series, explaining the drivers, settings, and software required to control the traffic light we built in the article, Continuous Deployment Traffic Light - Hardware.

Getting ready

After building the traffic light from the previous article, follow the instructions below to write a polling script that controls your traffic light.

How do it…

Install the Phidget drivers on your computer so that you can connect via the web interface.

http://www.phidgets.com/drivers.php

On OSX this will create a configuration in your System Preferences (I do not know where it lives on other operating systems, but it should be easy to find). Physically connect to the Phidget SBC controller directly via the ethernet port in the side, and find the device under detected PhidgetSBC. Double click on it to open the web interface.






Find the serial number of the Phidget SBC controller, as you will need for the management script.

If you attached the Wifi adapter, log the Phidget SBC controller into the network:

Next install the language package that you prefer (the code below is all written in python):

http://www.phidgets.com/programming_resources.php

This is the python script that Votizen.com uses to monitor CruiseControl (most of it is copied from the example code provided by Phidgets.com). Just put this code in a file and run it from the command line:

#!/usr/bin/env python

__author__ = 'Matt Snider'
__version__ = '1.0.0'
__date__ = 'May 4 2012'

#Basic imports
from logging import exception
import time, datetime
import httplib2

#Phidget specific imports
from Phidgets.PhidgetException import PhidgetException
from Phidgets.Devices.InterfaceKit import InterfaceKit

def update_state(interfaceKit, **kwargs):
    """
    Updates the state of the Phidget SBC controller, by closing circuits correlating to the states.
    This function should be called after polling. Pass in the following kwargs to activate light:
        error=True - red light
        building=True - yellow light
        good=True - green light
    """
    interfaceKit.setOutputState(0, kwargs.get('error', False))
    interfaceKit.setOutputState(1, kwargs.get('building', False))
    interfaceKit.setOutputState(2, kwargs.get('good', False))

def setup():
    #Create an interfacekit object
    try:
        interfaceKit = InterfaceKit()
    except RuntimeError as e:
        print("Runtime Exception: %s" % e.details)
        print("Exiting....")
        exit(1)

    #Information Display Function
    def displayDeviceInfo():
        print("|------------|----------------------------------|--------------|------------|")
        print("|- Attached -|-              Type              -|- Serial No. -|-  Version -|")
        print("|------------|----------------------------------|--------------|------------|")
        print("|- %8s -|- %30s -|- %10d -|- %8d -|" % (interfaceKit.isAttached(), interfaceKit.getDeviceName(), interfaceKit.getSerialNum(), interfaceKit.getDeviceVersion()))
        print("|------------|----------------------------------|--------------|------------|")
        print("Number of Digital Inputs: %i" % (interfaceKit.getInputCount()))
        print("Number of Digital Outputs: %i" % (interfaceKit.getOutputCount()))
        print("Number of Sensor Inputs: %i" % (interfaceKit.getSensorCount()))

    #Event Handler Callback Functions
    def inferfaceKitAttached(e):
        attached = e.device
        print("InterfaceKit %i Attached!" % (attached.getSerialNum()))

    def interfaceKitDetached(e):
        detached = e.device
        print("InterfaceKit %i Detached!" % (detached.getSerialNum()))

    def interfaceKitError(e):
        source = e.device
        print("InterfaceKit %i: Phidget Error %i: %s" % (source.getSerialNum(), e.eCode, e.description))

    def interfaceKitInputChanged(e):
        source = e.device
        print("InterfaceKit %i: Input %i: %s" % (source.getSerialNum(), e.index, e.state))

    def interfaceKitSensorChanged(e):
        source = e.device
        print("InterfaceKit %i: Sensor %i: %i" % (source.getSerialNum(), e.index, e.value))

    def interfaceKitOutputChanged(e):
        source = e.device
        print("InterfaceKit %i: Output %i: %s" % (source.getSerialNum(), e.index, e.state))

    #Main Program Code
    try:
        interfaceKit.setOnAttachHandler(inferfaceKitAttached)
        interfaceKit.setOnDetachHandler(interfaceKitDetached)
        interfaceKit.setOnErrorhandler(interfaceKitError)
        interfaceKit.setOnInputChangeHandler(interfaceKitInputChanged)
        interfaceKit.setOnOutputChangeHandler(interfaceKitOutputChanged)
        interfaceKit.setOnSensorChangeHandler(interfaceKitSensorChanged)
    except PhidgetException as e:
        print("Phidget Exception %i: %s" % (e.code, e.details))
        print("Exiting....")
        exit(1)

    print("Opening phidget object....")

    try:
    #    interfaceKit.openPhidget()
        interfaceKit.openRemote('phidgetsbc', serial={PHIDGET_SERIAL}, password='{PHIDGET_PASSWORD}')
    except PhidgetException as e:
        print("Phidget Exception %i: %s" % (e.code, e.details))
        print("Exiting....")
        exit(1)

    print("Waiting for attach....")

    try:
        interfaceKit.waitForAttach(10000)
    except PhidgetException as e:
        print("Phidget Exception %i: %s" % (e.code, e.details))
        try:
            interfaceKit.closePhidget()
        except PhidgetException as e:
            print("Phidget Exception %i: %s" % (e.code, e.details))
            print("Exiting....")
            exit(1)
        print("Exiting....")
        exit(1)
    else:
        displayDeviceInfo()

    update_state(interfaceKit)
    return interfaceKit

interfaceKit = setup()

# polls every minutes; ctrl-c to stop process
h = httplib2.Http()
failed = 0

while (True):
    # CruiseControl polling system, replace with whatever you use
    try:
        resp, content = h.request('http://{URL_TO_CRUISE_CONTROL}/dashboard/cctray.xml', 'GET')
    except (IOError, AttributeError):
        exception('unable to communicate with cctray, sleeping for 2 minutes')
        time.sleep(120) # error fetching, try again much later (2 minutes)
        continue

    try:
        if 'activity="Building"' in content:
            update_state(interfaceKit, building=True)
        elif 'lastBuildStatus="Failure"' in content:
            update_state(interfaceKit, error=True)
        else:
            update_state(interfaceKit, good=True)

        failed = 0
    except PhidgetException:
        if (10 == failed):
            raise Exception('Reattachment failed too many times')
        exception('attempting to reattach the phidget')
        time.sleep(120) # error connecting to the phidget, try to reattach after 30 seconds
        interfaceKit = setup()
        failed += 1
    except Exception:
        exception('unknown error, making traffic light red')
        update_state(interfaceKit, error=True)

    print "Last poll succeeded at %s" % datetime.datetime.now()
    time.sleep(30)

print("Closing...")

try:
    interfaceKit.closePhidget()
except PhidgetException as e:
    print("Phidget Exception %i: %s" % (e.code, e.details))
    print("Exiting....")
    exit(1)

print("Done.")
exit(0)

How it works…

The code above is for a long running python process that polls a cruise control XML file every 30 seconds, updating the state of the traffic light based on the state of the build. Most of the code is copied directly from the Phidget SBC controller sample code, except for the update_state function and the portion of the while loop that polls cruise control.

The interfaceKit instance is an object that wraps the communication layer between the code and the Phidget SBC controller. The setup function instantiates the interfaceKit and then sets up listeners. It is all stock code, except the call to update_state at the end, which turns all of the traffic lights off.

The update_state function requires the interfaceKit instance and optional boolean kwargs (good, building, error), which indicate which light(s) to turn on (by default all lights are turned off). The interfaceKit setOutputState function tells the Phidget SBC controller which circuit to open, based on the position you attached the wires to the side of the controller (first argument), and the boolean provided as the second argument (True = closed circuit, False = opened circuit). The previous articles hardware setup will have position 0 be the red light (error=True), 1 be the yellow state (building=True), and 2 be the green state (good=True).

The while loop will keep working until it encounters a critical error, but it does attempt to recover whenever possible. The important part is that it polls a URL and parses the response looking for string matches to indicate which light to turn on. I do find that this script occasionally loses connectivity to the Phidget SBC controller, but we have not diagnosed if it is a problem with the controller or our network or the script itself.

There’s more…

If there is interest, I will put this script up on github and people can contrub to it. I suspect that everybody will have slightly different continuous deployment environments, so there is probably not be a need. If you have questions or improvements, please do not hesitate to comment.