Streaming API Concepts

Updated on Thu, 2012-04-12 09:53

Protected vs. Public

Only non-protected public accounts can create public statuses. Statuses, including replies and mentions, created by a public account are candidates for inclusion in the Streaming API. Statuses created by protected accounts and all direct messages are non-public and are currently not available via the Streaming API, but are available on User Streams and Site Streams.

Result Quality

Both the Streaming API and the Search API filter, and on some end-points, discard, statuses created by a small proportion of accounts based upon status quality metrics. For example, frequent and repetitious status updates may, in some instances, and in combination with other metrics, result in a different status quality score for a given account. Results that are not selected by user id, for example: samples and keyword track, are filtered by this status quality metric. Results that are selected by user id, currently only results from the follow predicate, are unfiltered and allow all matching statuses to pass. If an expected user's statuses are not present in a non-follow-predicate stream type, manually cross-check the user against Search results. If the user's statuses are also not returned in Search, you can assume that the user's statuses will not be returned by non-follow-predicated streams.

For more details see: http://help.twitter.com/forums/10713/entries/42646 which states, in part:

In order to keep your search results relevant, Twitter filters search results for quality. Our search results will not include suspended accounts, or accounts that may jeopardize search quality. Material that degrades search relevancy or creates a bad search experience for people using Twitter may be permanently removed.

Result Relevance

The Streaming API results are a superset of the Search API result. The Search API filters and ranks statuses for relevance. On certain queries, the Search relevance filtering can be quite selective. The Streaming API does not perform any relevance filtering or ranking. All statuses that pass the Result Quality filter are available on Streaming API.

Authentication

HTTP Basic Authentication or OAuth is required to access the streaming API methods. A client must, for Basic Authentication, provide the credentials of a valid Twitter account, or for OAuth, sign the request appropriately.

NOTE: Basic Auth will eventually be deprecated on the Streaming API, but we have not planned or announced a timetable for this transition. User Streams and Site Streams require OAuth.

Access and Rate Limiting

All accounts may access the statuses/sample and statuses/filter methods at default access levels. Accounts may also be granted broader data access on these same methods on a case-by-case basis. Access to other methods requires a special arrangement with Twitter. Contact Contact Twitter with your use case, a brief description of your organization, and the requested access level(s).

Each account may create only one standing connection to the Streaming API. Subsequent connections from the same account may cause previously established connections to be disconnected. Excessive connection attempts, regardless of success, will result in an automatic ban of the client's IP address. Continually failing connections will result in your IP address being blacklisted from all Twitter access.

Connecting

To connect to the Streaming API, form a HTTP request and consume the resulting stream for as long as is practical. Our servers will hold the connection open indefinitely, barring server-side error, excessive client-side lag, network hiccups, routine server maintenance or duplicate logins.

While a client can be built around cycling connections, perhaps using curl for transport, the overall reliability will tend to be poor due to operational gotchas. Save curl for debugging and build upon a persistent process that does not require periodic reconnections.

The API expects HTTP basic auth with your screen name provided as the credential username. Email addresses are not accepted as valid usernames. Some HTTP clients are buggy when challenged with WWW-Authenticate: Basic. and these clients always return 401. A work-around for this client issue is to force basic auth on the first round.

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

There are four main reasons to have your connection closed:

  • Duplicate clients logins (earlier connections terminated)
  • Hosebird server restarts (code deploys)
  • Lagging connection terminated (client too slow, or insufficient bandwidth)
  • General Twitter network maintenance (Load balancer restarts, network reconfigurations, other very very rare events)

Once a valid connection connection drops, reconnect immediately.

When a network error (TCP/IP level) is encountered, back off linearly. Perhaps start at 250 milliseconds and cap at 16 seconds. Network layer problems are generally transitory and tend to clear quickly.

When a HTTP error (> 200) is returned, back off exponentially. Perhaps start with a 10 second wait, double on each subsequent failure, and finally cap the wait at 240 seconds. Consider sending an alert to a human operator after multiple HTTP errors, as there is probably a client configuration issue that is unlikely to be resolved without human intervention. There's not much point in polling any faster in the face of HTTP error codes and your client may find itself rate limited. Clients that reconnect immediately after an HTTP error or do not otherwise back-off exponentially, will be automatically rate limited and risk long-term blacklisting. You can test your back off algorithm by providing a bad password.

The Streaming API service is fairly lenient. Clients are not banned for a few dozen bungled connections here and there. But, if you code anything in a while loop that also doesn't have a sleep, you will eventually be banned for some small number of minutes. If you get banned repeatedly, all access to Twitter will be cut off for an indeterminate period of time.

Test that your client process honors the DNS Time To live (TTL). Some stacks will cache a resolved address for the duration of the process and will not pick up DNS changes within the proscribed TTL. Such aggressive caching will lead to service disruptions on your client as Twitter shifts load between IP addresses.

Parsing Responses

The Streaming API returns data JSON format. It is more compact and parsing is greatly simplified by the delimited parameter, described below.

Parsing JSON responses from the Streaming API is simple every object is returned on its own line, and ends with a carriage return. Newline characters (\n) may occur in object elements (the text element of a status object, for example), but carriage returns (\r) should not.

Parsers must be tolerant of occasional extra newline characters placed between statuses. These characters are placed as periodic "keep-alive" messages, should the stream of statuses temporarily pause. These keep-alives allow clients and NAT firewalls to determine that the connection is indeed still valid during low volume periods. Parsing either markup language may be easier and more efficient with the delimited query parameter, documented below.

Streams may also contain status deletion notices. Clients are urged to honor deletion requests and discard deleted statuses immediately. At times, status deletion messages may arrive before the status. Even in this case, the late arriving status should be deleted from your backing store.

JSON

  1. {"delete":{"status":{"id":1234,"id_str":"1234","user_id":3,"user_id_str":"3"}}}

Streams may also contain location deletion messages. Clients are urged to honor deletion requests and remove appropriate geolocation information from both the display and your backing store immediately. Note that in some cases the location deletion message may arrive before a tweet that lies within the deletion range arrives. You should still strip the location data.

JSON

  1. {"scrub_geo":{"user_id":14090452,"user_id_str":"14090452","up_to_status_id":23260136625,"up_to_status_id_str":"23260136625"}}

Track streams may also contain limitation notices, where the integer track is an enumeration of statuses that, since the start of the connection, matched the track predicate but were rate limited.

JSON

  1. {"limit":{"track":1234}}

Additional objects may be introduced into the markup stream in future releases without changing the resource revisions. Ensure that your parser is tolerant of unexpected objects.

The Streaming API presents events in best effort ordering. During periods of instability, statuses and status deletions may arrive significantly out of order.

An example delimited client loop in pseudocode:

  1. while (true) {
  2.   do {
  3.     lengthBytes = readline()
  4.   } while (lengthBytes.length < 1)
  5.   enqueueForMarkupProcessor(read(Integer(lengthBytes).parseInt()))
  6. }

Collecting and Processing

Repeated experience shows that clients must also plan for considerable growth and considerable temporary spikes in volume. Even at steady-state, the top tiers of the Streaming API will produce a lot of data, often more than can be reasonably processed on a single core. A prudent developer will plan for traffic to double every few months, and will be test and provision to handle spikes of at least three times current daily peak volumes. Twitter is most useful to your end users during collective events. Plan accordingly.

To prevent latency problems and plan for scale, design your client with decoupled collection, processing and persistence components. The collection component should efficiently handle connecting to the Streaming API and retrieving responses, as well as reconnecting in the event of network failure, and hand-off statuses via an asynchronous queueing mechanism to application specific processing and persistence components. This component should be isolated from any subsequent downstream processing backlog or maintenance, otherwise queuing will occur in the Streaming API. Eventually your client will be disconnected, resulting in data loss.

For example, collect "raw" statuses (that is, not parsed or marshaled into your language's native object format) in one process, and pass each status into a queueing system, rotated flatfile, or database. In a second process, consume statuses from your queue or store of choice, parse them, extract the fields relevant to your application, etc. Consumers of high-volume streams should consider performing markup parsing in a parallel manner as the status volume is approaching the single processor throughput limit of some software stacks. End-to-end stress test your stack.

Quality of Service

The Streaming API Quality of Service (QoS) is:

  • Best-effort
  • Unordered
  • Generally at-least-once

This QoS implies that, on rare occasion and without notice, statuses may be missing from the delivered stream. During routine streaming, statuses will arrive in any order, but will tend to be k-sorted, where k is the number of tweets received in about 3 seconds. Randomly during established connections and upon client reconnect, and especially when using the count parameter, duplicate and non-k-sorted statuses may be delivered. Duplicate messages on a long established stream will tend to be in very short duration bursts and result from operational events within Twitter, such as upstream server rebalancing. Consuming applications must tolerate duplicate statuses, out-of-order statuses and non-status messages.

The Streaming API may disconnect your connection at any time, perhaps multiple times per day. Proper client coding will prevent data loss due to occasional disconnections.

Example

  1. % curl https://stream.twitter.com/1/statuses/sample.json -uYOUR_TWITTER_USERNAME:YOUR_PASSWORD

Sampling

The statuses/sample feed is sampled from the Firehose stream of public statuses. The sampling algorithm is consistent; no additional data can be gleaned by consuming more than one sampled feed at a given access level or additional feeds at a lower access levels. All feeds at a given access level are identical, and all lower access levels are strict subsets of higher access levels. Long-term consumption of duplicate data wastes limited resources and may lead to a ban from the service.

Sampling proportions are subject to continuous unannounced refinement. Our goal is to provide useful low-latency samples without overwhelming clients or incurring excessive delivery cost. In particular, we reserve the right to alter the sampling proportion as overall traffic grows to keep the sampled feeds practical.

The current sampling rate is ~1% of public statuses by default (aka Spritzer), and ~10% of public statuses for the Gardenhose role. The algorithm is exactly as follows:

The status id modulo 100 is taken on each public status, that is, from the Firehose. Modulus value 0 is delivered to Spritzer, and values 0-10 are delivered to Gardenhose. Over a significant period, a 1% and a 10% sample of public statuses is approached. This algorithm, in conjunction with the status id assignment algorithm, will tend to produce a random selection.

Note that these sample percentages are not those of all statuses created, rather these are percentages of the public subset. The proportion of statuses sampled versus all statuses created varies over time as the proportion of public statuses to all statuses varies. This proportion varies with the proportion of Protected accounts and accounts filtered for quality, and their relative status creation rate, versus the total population's status creation rate.

Filter Limiting

The the track parameter (keywords), and the location parameter (geo) on the statuses/filter method are rate-limited predicates. The follow (userid) parameter is rate-limited, but it is rare to be rate limited when using a dedicated stream for just the follow parameter. You may find rate limiting issues with the follow parameter if you mix it with the track parameter. Reasonably focused track and location predicates will return all occurrences in the full Firehose stream of public statuses. Overly broad predicates will cause the output to be periodically limited. After the limitation period expires, all matching statuses will once again be delivered, along with a limit message that enumerates the total number of statuses that have been eliminated from the stream since the start of the connection. Limit messages are described in Parsing Responses.

Track streams with identical predicates will produce identical streams. Limiter periodicity is aligned with statuses/sample sampling periodicity; thus broad predicates will produce limited streams that will tend to be a subset of the statuses/sample streams. Creating multiple track queries to gather more statuses than are available in the sampled feeds is likely to be fruitless and may result in automatic banning. Creating multiple track queries to circumvent rate limits may also result in automatic banning. Repeated banning eventually will cause all of your Twitter.com access to be denied.

Updating Filter Predicates

Updating track and follow predicate parameters with low latency and low data loss is possible, but currently requires a bit of programming effort on the client.

  • Reconnect only when you have a change and not on a fixed schedule. Keep change to an absolute minimum.
  • Upon a change, reconnect immediately if no changes have occurred for some time. For example, reconnect no more than twice every four minutes, or three times per six minutes, or some similar metric. Depending on your requirements and heuristics, many changes can then be applied with nearly no latency, while only some small proportion have to wait for an update window.
  • Connect with your new predicate, wait for the first response, then immediately disconnect the old connection. Keep the window where you are connected twice to an absolute minimum. Sometimes the Streaming API service will disconnect the old connection as it begins to feed the new connection. This step probably requires a multi-threaded development environment, or at the very least, inter-process-communication (IPC) of some sort. But, once this technique is working well, the lost tweets should be practically zero.
  • Reduce your loss window even further by using another user account with default access levels in parallel with your main account at higher access levels. Reconnect on the default access account for every change (within the time limits and other rules above). Every hour or so, reconnect the main stream with whatever deltas have accumulated from the default access stream. This keeps the majority of your feed at a low connection velocity and therefore low data loss, but allows low update latency. Please ensure that this secondary stream is always returning a disjoint result set so as not to waste bandwidth.
  • Some resources and some higher access levels allow the follow predicate to be combined with the count parameter. At the expense of some minor latency, the resulting lookback will completely mask data loss resulting from a reasonable reconnection gap.

If you do all this, you should be able to offer a low-latency user experience with nearly zero data loss.

By Language And Country

With some effort, it should be feasible to build a stream representing the vast majority of statuses in a given non-English language or from many non-English speaking countries. The following suggested approach is untested conjecture, requires additional access and some non-trivial development effort.

  1. Investigate the newly proposed geo-location data fields and the existing self-reported location fields in statuses.
  2. Determine a set of stop words and other appropriate keywords. Use the track parameter to capture this stream. Track streams provides rate limiting feedback to allow aggressive query tuning right up to the stream's rate limit. Use an account with the restricted track role to allow a sufficient number of keywords and to also allow a larger proportion of statuses to pass should your keywords
  3. Use the results from #2 to determine the set of all targeted users. Heuristically rank them and follow the top 50,000 ids with the follow param on a "shadow" account. Once you throw out low value accounts, 50,000 accounts will probably constitute the vast majority of your target population by status volume. You can also salt your heuristic by the number of followers, available via the REST API, and with other signals of relevance. You will receive all statuses for these accounts, as there is no rate limiting on follow.
  4. Consume the Gardenhose to determine the completeness of the aggregate output of #2 and #3. You can use offline algorithms of arbitrary complexity on this result stream, and also leverage geo-location and self-reported location fields in statuses. Of course, feed back any newly found users into the user set in #3.
  5. Backfill critical missing data using the Search API. If an influential new account starts updating, it's OK not to detect it right away. When you find valuable accounts, you can get a week or so of historical data from the Search API.

Gzip Compression

Gzip compression may reduce the bandwidth needed to process a stream to as small as 1/5th the size of an uncompressed stream. Request a gzipped stream by connecting with the following HTTP header:

  1. Accept-Encoding: deflate, gzip

Twitter will respond with a gzipped stream.

Note: There are cases where Twitter will not return a compressed stream, even if one was requested. Always check the Content-Encoding header to verify that the stream is actually being compressed. To make sure you get a compressed stream:

  • Make a HTTP 1.1 request.
  • Include a User-Agent header. Any value should be fine.
  • Include a valid Host header.
  • Do not send a Connection: close header.

Gzip and EventMachine

The Ruby EventMachine library defaults to sending a Connection: close header, which will suppress gzip encoding. To prevent this, pass :keepalive => true when connecting to the streaming endpoint. EventMachine currently only supports deflate compressed streams, so send a Accept-Encoding: deflate header.

An example request line for an EventMachine integration:

  1. @http = EventMachine::HttpRequest.new('STREAMING URL').post(:body=>BODY, :head => {"Content-Type" => "application/x-www-form-urlencoded", "Accept-Encoding" => "deflate", "User-Agent" => "USER AGENT"}, :timeout => 90, :keepalive => true) do |client|

Gzip and Java

Java clients which use java.util.zip.GZIPInputStream() and wrap it with a java.io.BufferedReader() to read streaming API data will encounter buffering on low volume streams, since GZIPInputStream's available() method is not suitable for streaming purposes. To fix this, create a subclass of GZIPInputStream() which overrides the available() method. For example:

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.util.zip.GZIPInputStream;
  4.  
  5. final class StreamingGZIPInputStream extends GZIPInputStream {
  6.     private final InputStream wrapped;
  7.     public StreamingGZIPInputStream(InputStream is) throws IOException {
  8.       super(is);
  9.       wrapped = is;
  10.     }
  11.  
  12.     /**
  13.      * Overrides behavior of GZIPInputStream which assumes we have all the data available
  14.      * which is not true for streaming. We instead rely on the underlying stream to tell us
  15.      * how much data is available.
  16.      * 
  17.      * Programs should not count on this method to return the actual number
  18.      * of bytes that could be read without blocking.
  19.      *
  20.      * @return - whatever the wrapped InputStream returns
  21.      * @exception  IOException  if an I/O error occurs.
  22.      */
  23.     public int available() throws IOException {
  24.       return wrapped.available();
  25.     }
  26. }

To use this class, replace the use of GZIPInputStream, as in this example:

  1. Reader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(http.getInputStream());

With StreamingGZipInputStream:

  1. Reader reader = new BufferedReader(new InputStreamReader(new StreamingGZIPInputStream(http.getInputStream());

Further Reading