Tutorial

Installing the Python Katcp Library

Stable release

To install katcp, run this command in your terminal:

$ pip install katcp

This is the preferred method to install katcp, as it will always install the most recent stable release.

If you don’t have pip installed, this Python installation guide can guide you through the process.

Alternatively,

$ easy_install katcp

Note: This requires the setuptools Python package to be installed.

From sources

The sources for katcp can be downloaded from the Github repo.

You can either clone the public repository:

$ git clone https://github.com/ska-sa/katcp-python/

Or download the tarball:

$ curl -OJL https://github.com/ska-sa/katcp-python/tarball/master

Once you have a copy of the source, you can install it with:

$ python setup.py install

Usage

Using the Blocking Client

The blocking client is the most straight-forward way of querying a KATCP device. It is used as follows:

from katcp import BlockingClient, Message

device_host = "www.example.com"
device_port = 5000

client = BlockingClient(device_host, device_port)
client.start()
client.wait_protocol() # Optional

reply, informs = client.blocking_request(
    Message.request("help"))

print reply
for msg in informs:
    print msg

client.stop()
client.join()

After creating the BlockingClient instance, the start() method is called to launch the client thread. The wait_protocol() method waits until katcp version information has been received from the server, allowing the KATCP version spoken by the server to be known; server protocol information is stores in client.protocol_flags. Once you have finished with the client, stop() can be called to request that the thread shutdown. Finally, join() is used to wait for the client thread to finish.

While the client is active the blocking_request() method can be used to send messages to the KATCP server and wait for replies. If a reply is not received within the allowed time, a RuntimeError is raised.

If a reply is received blocking_request() returns two values. The first is the Message containing the reply. The second is a list of messages containing any KATCP informs associated with the reply.

Using the Callback Client

For situations where one wants to communicate with a server but doesn’t want to wait for a reply, the CallbackClient is provided:

from katcp import CallbackClient, Message

device_host = "www.example.com"
device_port = 5000

def reply_cb(msg):
    print "Reply:", msg

def inform_cb(msg):
    print "Inform:", msg

client = CallbackClient(device_host, device_port)
client.start()

reply, informs = client.callback_request(
    Message.request("help"),
    reply_cb=reply_cb,
    inform_cb=inform_cb,
)

client.stop()
client.join()

Note that the reply_cb() and inform_cb() callback functions are both called inside the client’s event-loop thread so should not perform any operations that block. If needed, pass the data out from the callback function to another thread using a Queue.Queue or similar structure.

Writing your own Client

If neither the BlockingClient nor the CallbackClient provide the functionality you need then you can sub-class DeviceClient which is the base class from which both are derived.

DeviceClient has two methods for sending messages:

Internally request calls send_message to pass messages to the server.

Note

The send_message() method does not return an error code or raise an exception if sending the message fails. Since the underlying protocol is entirely asynchronous, the only means to check that a request was successful is receive a reply message. One can check that the client is connected before sending a message using is_connected().

When the DeviceClient thread receives a completed message, handle_message() is called. The default handle_message() implementation calls one of handle_reply(), handle_inform() or handle_request() depending on the type of message received.

Note

Sending requests to clients is discouraged. The handle_request() is provided mostly for completeness and to deal with unforseen circumstances.

Each of handle_reply(), handle_inform() and handle_request() dispatches messages to methods based on the message name. For example, a reply message named foo will be dispatched to reply_foo(). Similarly an inform message named bar will be dispatched to inform_bar(). If no corresponding method is found then one of unhandled_reply(), unhandled_inform() or unhandled_request() is called.

Your own client may hook into this dispatch tree at any point by implementing or overriding the appropriate methods.

An example of a simple client that only handles replies to help messages is presented below:

from katcp import DeviceClient, Message
import time

device_host = "www.example.com"
device_port = 5000

class MyClient(DeviceClient):

    def reply_help(self, msg):
        """Print out help replies."""
        print msg.name, msg.arguments

    def inform_help(self, msg):
        """Print out help inform messages."""
        meth, desc = msg.arguments[:2]
        print "---------", meth, "---------"
        print
        print desc
        print "----------------------------"

    def unhandled_reply(self, msg):
        """Print out unhandled replies."""
        print "Unhandled reply", msg.name

    def unhandled_inform(self, msg):
        "Print out unhandled informs."""
        print "Unhandled inform", msg.name


client = MyClient(device_host, device_port)
client.start()

client.request(Message.request("help"))
client.request(Message.request("watchdog"))

time.sleep(0.5)

client.stop()
client.join()

Client handler functions can use the unpack_message() decorator from kattypes module to unpack messages into function arguments in the same way the request() decorator is used in the server example below, except that the req parameter is omitted.

Using the high-level client API

The high level client API inspects a KATCP device server and presents requests as method calls and sensors as objects.

A high level client for the example server presented in the following section:

import tornado

from tornado.ioloop import IOLoop
from katcp import resource_client

ioloop = IOLoop.current()

client = resource_client.KATCPClientResource(dict(
    name='demo-client',
    address=('localhost', 5000),
    controlled=True))

@tornado.gen.coroutine
def demo():
    # Wait until the client has finished inspecting the device
    yield client.until_synced()
    help_response = yield client.req.help()
    print "device help:\n ", help_response
    add_response = yield client.req.add(3, 6)
    print "3 + 6 response:\n", add_response
    # By not yielding we are not waiting for the response
    pick_response_future = client.req.pick_fruit()
    # Instead we wait for the fruit.result sensor status to change to
    # nominal. Before we can wait on a sensor, a strategy must be set:
    client.sensor.fruit_result.set_strategy('event')
    # If the condition does not occur within the timeout (default 5s), we will
    # get a TimeoutException
    yield client.sensor.fruit_result.wait(
        lambda reading: reading.status == 'nominal')
    fruit = yield client.sensor.fruit_result.get_value()
    print 'Fruit picked: ', fruit
    # And see how the ?pick-fruit request responded by yielding on its future
    pick_response = yield pick_response_future
    print 'pick response: \n', pick_response
    # Finally stop the ioloop so that the program exits
    ioloop.stop()

# Note, katcp.resource_client.ThreadSafeKATCPClientResourceWrapper can be used to
# turn the client into a 'blocking' client for use in e.g. ipython. It will turn
# all functions that return tornado futures into blocking calls, and will bounce
# all method calls through the ioloop. In this case the ioloop must be started
# in a separate thread. katcp.ioloop_manager.IOLoopManager can be used to manage
# the ioloop thread.

ioloop.add_callback(client.start)
ioloop.add_callback(demo)
ioloop.start()

Writing your own Server

Creating a server requires sub-classing DeviceServer. This class already provides all the requests and inform messages required by the KATCP protocol. However, its implementation requires a little assistance from the subclass in order to function.

A very simple server example looks like:

import threading
import time
import random

from katcp import DeviceServer, Sensor, ProtocolFlags, AsyncReply
from katcp.kattypes import (Str, Float, Timestamp, Discrete,
                            request, return_reply)

server_host = ""
server_port = 5000

class MyServer(DeviceServer):

    VERSION_INFO = ("example-api", 1, 0)
    BUILD_INFO = ("example-implementation", 0, 1, "")

    # Optionally set the KATCP protocol version and features. Defaults to
    # the latest implemented version of KATCP, with all supported optional
    # features
    PROTOCOL_INFO = ProtocolFlags(5, 0, set([
        ProtocolFlags.MULTI_CLIENT,
        ProtocolFlags.MESSAGE_IDS,
    ]))

    FRUIT = [
        "apple", "banana", "pear", "kiwi",
    ]

    def setup_sensors(self):
        """Setup some server sensors."""
        self._add_result = Sensor.float("add.result",
            "Last ?add result.", "", [-10000, 10000])

        self._time_result = Sensor.timestamp("time.result",
            "Last ?time result.", "")

        self._eval_result = Sensor.string("eval.result",
            "Last ?eval result.", "")

        self._fruit_result = Sensor.discrete("fruit.result",
            "Last ?pick-fruit result.", "", self.FRUIT)

        self.add_sensor(self._add_result)
        self.add_sensor(self._time_result)
        self.add_sensor(self._eval_result)
        self.add_sensor(self._fruit_result)

    @request(Float(), Float())
    @return_reply(Float())
    def request_add(self, req, x, y):
        """Add two numbers"""
        r = x + y
        self._add_result.set_value(r)
        return ("ok", r)

    @request()
    @return_reply(Timestamp())
    def request_time(self, req):
        """Return the current time in seconds since the Unix Epoch."""
        r = time.time()
        self._time_result.set_value(r)
        return ("ok", r)

    @request(Str())
    @return_reply(Str())
    def request_eval(self, req, expression):
        """Evaluate a Python expression."""
        r = str(eval(expression))
        self._eval_result.set_value(r)
        return ("ok", r)

    @request()
    @return_reply(Discrete(FRUIT))
    def request_pick_fruit(self, req):
        """Pick a random fruit."""
        r = random.choice(self.FRUIT + [None])
        if r is None:
            return ("fail", "No fruit.")
        delay = random.randrange(1,5)
        req.inform("Picking will take %d seconds" % delay)

        def pick_handler():
            self._fruit_result.set_value(r)
            req.reply("ok", r)

        self.ioloop.add_callback(
          self.ioloop.call_later, delay, pick_handler)

        raise AsyncReply

    def request_raw_reverse(self, req, msg):
        """
        A raw request handler to demonstrate the calling convention if
        @request decorator are not used. Reverses the message arguments.
        """
        # msg is a katcp.Message.request object
        reversed_args = msg.arguments[::-1]
        # req.make_reply() makes a katcp.Message.reply using the correct request
        # name and message ID
        return req.make_reply('ok', *reversed_args)


if __name__ == "__main__":

    server = MyServer(server_host, server_port)
    server.start()
    server.join()

Notice that MyServer has three special class attributes VERSION_INFO, BUILD_INFO and PROTOCOL_INFO. VERSION_INFO gives the version of the server API. Many implementations might use the same VERSION_INFO. BUILD_INFO gives the version of the software that provides the device. Each device implementation should have a unique BUILD_INFO. PROTOCOL_INFO is an instance of ProtocolFlags that describes the KATCP dialect spoken by the server. If not specified, it defaults to the latest implemented version of KATCP, with all supported optional features. Using a version different from the default may change server behaviour; furthermore version info may need to be passed to the @request and @return_reply decorators.

The setup_sensors() method registers Sensor objects with the device server. The base class uses this information to implement the ?sensor-list, ?sensor-value and ?sensor-sampling requests. add_sensor() should be called once for each sensor the device should contain. You may create the sensor objects inside setup_sensors() (as done in the example) or elsewhere if you wish.

Request handlers are added to the server by creating methods whose names start with “request_”. These methods take two arguments – the client-request object (abstracts the client socket and the request context) that the request came from, and the request message. Notice that the message argument is missing from the methods in the example. This is a result of the request() decorator that has been applied to the methods.

The request() decorator takes a list of KatcpType objects describing the request arguments. Once the arguments have been checked they are passed in to the underlying request method as additional parameters instead of the request message.

The return_reply decorator performs a similar operation for replies. Once the request method returns a tuple (or list) of reply arguments, the decorator checks the values of the arguments and constructs a suitable reply message.

Use of the request() and return_reply() decorators is encouraged but entirely optional.

Message dispatch is handled in much the same way as described in the client example, with the exception that there are no unhandled_request(), unhandled_reply() or unhandled_request() methods. Instead, the server will log an exception.

Writing your own Async Server

To write a server in the typical tornado async style, modify the example above by adding the following imports

import signal
import tornado

from katcp import AsyncDeviceServer

Also replace class MyServer(DeviceServer) with class MyServer(AsyncDeviceServer) and replace the if __name__ == “__main__”: block with

@tornado.gen.coroutine
def on_shutdown(ioloop, server):
    print('Shutting down')
    yield server.stop()
    ioloop.stop()

if __name__ == "__main__":
    ioloop = tornado.ioloop.IOLoop.current()
    server = MyServer(server_host, server_port)
    # Hook up to SIGINT so that ctrl-C results in a clean shutdown
    signal.signal(signal.SIGINT, lambda sig, frame: ioloop.add_callback_from_signal(
        on_shutdown, ioloop, server))
    ioloop.add_callback(server.start)
    ioloop.start()

If multiple servers are started in a single ioloop, on_shutdown() should be modified to call stop() on each server. This is needed to allow a clean shutdown that adheres to the KATCP specification requirement that a #disconnect inform is sent when a server shuts down.

Event Loops and Thread Safety

As of version 0.6.0, katcp-python was completely reworked to use Tornado as an event- and network library. A typical Tornado application would only use a single tornado.ioloop.IOLoop event-loop instance. Logically independent parts of the application would all share the same ioloop using e.g. coroutines to allow concurrent tasks.

However, to maintain backwards compatibility with the thread-semantics of older versions of this library, it supports starting a tornado.ioloop.IOLoop instance in a new thread for each client or server. Instantiating the BlockingClient or CallbackClient client classes or the DeviceServer server class will implement the backward compatible behaviour by default, while using AsyncClient or AsyncDeviceServer will by default use tornado.ioloop.IOLoop.current() as the ioloop (can be overridden using their set_ioloop methods), and won’t enable thread safety by default (can be overridden using AsyncDeviceServer.set_concurrency_options() and AsyncClient.enable_thread_safety())

Note that any message (request, reply, iform) handling methods should not block. A blocking handler will block the ioloop, causing all timed operations (e.g. sensor strategies), network io, etc. to block. This is particularly important when multiple servers/clients share a single ioloop. A good solution for handlers that need to wait on other tasks is to implement them as Tornado coroutines. A DeviceServer will not accept another request message from a client connection until the request handler has completed / resolved its future. Multiple outstanding requests can be handled concurrently by raising the AsyncReply exception in a request handler. It is then the responsibility of the user to ensure that a reply is eventually sent using the req object.

If DeviceServer.set_concurrency_options() has handler_thread=True (the default for DeviceServer, AsyncDeviceServer defaults to False), all the requests to a server is serialised and handled in a separate request handing thread. This allows request handlers to block without preventing sensor strategy updates, providing backwards-compatible concurrency semantics.

In the case of a purely network-event driven server or client, all user code would execute in the thread context of the server or client event loop. Therefore all handler functions must be non-blocking to prevent unresponsiveness. Unhandled exceptions raised by handlers running in the network event-thread are caught and logged; in the case of servers, an error reply including the traceback is sent over the network interface. Slow operations (such as picking fruit) may be delegated to another thread (if a threadsafe server is used), a callback (as shown in the request_pick_fruit handler in the server example) or tornado coroutine.

If a device is linked to processing that occurs independently of network events, one approach would be a model thread running in the background. The KATCP handler code would then defer requests to the model. The model must provide a thread-safe interface to the KATCP code. If using an async server (e.g. AsyncDeviceServer or DeviceServer.set_concurrency_options() called with thread_safe=False), all interaction with the device server needs to be through the tornado.ioloop.Ioloop.add_callback() method of the server’s ioloop. The server’s ioloop instance can be accessed through its ioloop attribute. If a threadsafe server (e.g. DeviceServer with default concurrency options) or client (e.g. CallbackClient) is used, all the public methods provided by this katcp library for sending !replies or #informs are thread safe.

Updates to Sensor objects using the public setter methods are always thread-safe, provided that the same is true for all the observers attached to the sensor. The server observers used to implement sampling strategies are threadsafe, even if an async server is used.

Backwards Compatibility

Server Protocol Backwards Compatibility

A minor modification of the first several lines of the example in Writing your own Server suffices to create a KATCP v4 server:

from katcp import DeviceServer, Sensor, ProtocolFlags, AsyncReply
from katcp.kattypes import (Str, Float, Timestamp, Discrete,
                            request, return_reply)

from functools import partial
import threading
import time
import random

server_host = ""
server_port = 5000

# Bind the KATCP major version of the request and return_reply decorators
# to version 4
request = partial(request, major=4)
return_reply = partial(return_reply, major=4)

class MyServer(DeviceServer):

    VERSION_INFO = ("example-api", 1, 0)
    BUILD_INFO = ("example-implementation", 0, 1, "")

    # Optionally set the KATCP protocol version as 4.
    PROTOCOL_INFO = ProtocolFlags(4, 0, set([
        ProtocolFlags.MULTI_CLIENT,
    ]))

The rest of the example follows as before.

Client Protocol Backwards Compatibility

The DeviceClient client automatically detects the version of the server if it can, see Server KATCP Version Auto-detection. For a simple client this means that no changes are required to support different KATCP versions. However, the semantics of the messages might be different for different protocol versions. Using the unpack_message decorator with major=4 for reply or inform handlers might help here, although it could use some improvement.

In the case of version auto-detection failing for a given server, preset_protocol_flags can be used to set the KATCP version before calling the client’s start() method.