twitter_storm package

Submodules

twitter_storm.get_tweets module

twitter_storm.get_tweets.authenticate(rebuild=False)[source]

Return an OAuthHandler after the autentication process. If rebuild is true then any access token and key stored in disk will be ignored.

Following https://pythonhosted.org/tweepy/html/auth_tutorial.html?highlight=oauthhandler

Parameters:rebuild (bool) – If rebuild is True then the autentication process is performed from scratch.
Returns:A tweepy.api.API object ready to use for autentication.
Raises:RuntimeError
twitter_storm.get_tweets.available_places()[source]

:returns a dictionary from place names to a dictionary with keys “lat” and “long” for the corresponding longitude and latitute

twitter_storm.get_tweets.example_tweet_processor(status)[source]

Gets the trending topics closest to the location specified.

Parameters:
  • api (tweepy.api.API object) – API object to use for connecting to Twitter.
  • place (str) – encodes a location, following the convention in the configuration file FIXME.
Returns:

A list of dictionaries with keys ‘name’ and ‘query’ corresponding to hashtags for the 10 most trending topics for that location, e.g. [{‘query’: ‘%22Adolfo+Su%C3%A1rez%22’, ‘name’: u’Adolfo Suárez’}, ...]

Launch a search to Twitter for tweets corresponding to the trend spec in the list trends, that has the format returned by get_trending_topics_text().

Parameters:
  • api (tweepy.api.API object) – API object to use for connecting to Twitter.
  • trends – list of dictionaries in the format returned by get_trending_topics_text().
  • count (int) – number of tweets to retrieve per element in trends.
  • popular (bool) – whether to search for poular tweets (result_type ‘popular’ in the queries) or not. This limits the result to 15 tweets maximum ignoring ‘count’, for what it appears to be a bug https://dev.twitter.com/discussions/19472.
  • tweet_processor – Function that takes an object of the class tweepy.models.Status and returns a dictionary corresponding to projecting some fields of that object. This way projection may occour at levels deeper that the first level. By default the fields ‘text’, ‘favorite_count’, ‘created_at’ are projected. search_params - any other arguments for calling api.search, e.g. lang (“es”, “en”, ..)
  • search_params – Additional keyword arguments to use in the call to api.search for querying Twitter.
Returns:

An iterator of the input list where each dictionary has been extended with a ‘tweets’ field that contains the result of applying tweet_processor to each tweepy.models.Status returned by the corresponding query to twitter. See https://dev.twitter.com/docs/platform-objects/tweets for the meaning of the fields.

twitter_storm.get_tweets_bolt module

twitter_storm.places_spout module

twitter_storm.storm module

Implementation of Storm’s multilang protocol that can be used to create Spout and Bolts in python. Communication with the parent process is performed using strings corresponding to JSON objects through stdin and stdout, and finishing messages with a “end” line. See readMsg() below for details.
  • As a consequence no log message should be written to stdout, use stderr or a proper logging mechanism instead
  • See https://github.com/nathanmarz/storm/wiki/Multilang-protocol for a description of Storm’s multilang protocol. See the book “Getting Started with Storm” -> Chapter 7. Using Non-JVM Languages with Storm for another example in php
Author: Nathan Marz <nathan@nathanmarz.com>
This is a modification of the code from https://github.com/nathanmarz/storm/blob/master/storm-core/src/multilang/py/storm.py with some additional comments by Juan Rodriguez Hortala <juan.rodriguez.hortala@gmail.com>. Only comments were added, the functionality is the same.
class twitter_storm.storm.BasicBolt[source]

Bases: object

Similar to the Bolt class, but the anchoring and ack of each tuple is handled automatically. The same effect can be obtained within the Bolt class with suitable calls to the functions emit(), emitDirect() and ack() at the method process()

initialize(stormconf, context)[source]
process(tuple)[source]
run()[source]
class twitter_storm.storm.Bolt[source]

Bases: object

Roughly equivalent to the IBolt interface http://nathanmarz.github.io/storm/doc-0.7.1/backtype/storm/task/IBolt.html

To define a bolt in python write a module that defines a subclass of Bolt, and that calls run() for and instance of that class in its __main__

There is no equivalent to IBolt.prepare(), but:
  • The configuration and context obtained during the Multilang-protocol handshake are passed in the call to initialize().
  • Use the functions emit(), emitDirect(), ack(), fail() and reportError() to emulate the corresponding methods for OutputCollector
  • Use the functiom log() to log messages in the parent Storm worker log
initialize(stormconf, context)[source]

Similar to IBolt.prepare(). Called when a task for this component is initialized within a worker on the cluster.

Parameters:
process(tuple)[source]

Process a new tuple

Parameters:tuple (storm.Tuple) – Input tuple to process. Use tuple.values to access the list of component values for this tuple
run()[source]

To define a bolt in python write a module that defines a subclass of Bolt, and that calls run() for and instance of that class in its __main__

class twitter_storm.storm.Spout[source]

Bases: object

Roughly equivalent to the ISpout interface http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html

To define a spout in python write a module that defines a subclass of Spout, and that calls run() for and instance of that class in its __main__

There is no equivalent to ISpout.open(), but:
  • The configuration and context obtained during the Multilang-protocol handshake are passed in the call to initialize().
  • Use the functions emit(), emitDirect() and reportError() to emulate the corresponding methods for SpoutOutputCollector
  • Use the functiom log() to log messages in the parent Storm worker log
ack(id)[source]

Equivalent to ISpout.ack()

Parameters:id (str) – messageId of a tuple emmitted by this spout which has been fully processed
fail(id)[source]

Equivalent to ISpout.fail()

Parameters:id (str) – messageId of a tuple emmitted by this spout which has failed to be fully processed
initialize(conf, context)[source]

Similar to ISpout.open(). Called when a task for this component is initialized within a worker on the cluster.

Parameters:
Examples:
  • Example configuration:

{“topology.tuple.serializer”: “backtype.storm.serialization.types.ListDelegateSerializer”, “topology.workers”: 1, “drpc.worker.threads”: 64, “storm.messaging.netty.client_worker_threads”: 1, “supervisor.heartbeat.frequency.secs”: 5, “topology.executor.send.buffer.size”: 1024, “drpc.childopts”: “-Xmx768m”, “nimbus.thrift.port”: 6627, “storm.zookeeper.retry.intervalceiling.millis”: 30000, “storm.local.dir”: “/tmp/9a198ac9-61e4-406e-95a6-88d5a8d623b9”, “topology.receiver.buffer.size”: 8, “storm.zookeeper.servers”: [“localhost”], “transactional.zookeeper.root”: “/transactional”, “drpc.request.timeout.secs”: 600, “topology.skip.missing.kryo.registrations”: true, “worker.heartbeat.frequency.secs”: 1, “zmq.hwm”: 0, “storm.zookeeper.connection.timeout”: 15000, “java.library.path”: “/usr/local/lib:/opt/local/lib:/usr/lib”, “topology.max.error.report.per.interval”: 5, “storm.messaging.netty.server_worker_threads”: 1, “storm.id”: “test-1-1397736849”, “supervisor.worker.start.timeout.secs”: 120, “zmq.threads”: 1, “topology.acker.executors”: null, “storm.local.mode.zmq”: false, “topology.max.task.parallelism”: null, “storm.zookeeper.port”: 2000, “nimbus.childopts”: “-Xmx1024m”, “worker.childopts”: “-Xmx768m”, “drpc.queue.size”: 128, “storm.zookeeper.retry.times”: 5, “nimbus.monitor.freq.secs”: 10, “storm.cluster.mode”: “local”, “dev.zookeeper.path”: “/tmp/dev-storm-zookeeper”, “drpc.invocations.port”: 3773, “topology.tasks”: null, “storm.zookeeper.root”: “/storm”, “logviewer.childopts”: “-Xmx128m”, “transactional.zookeeper.port”: null, “topology.worker.childopts”: null, “topology.max.spout.pending”: null, “topology.kryo.register”: null, “nimbus.cleanup.inbox.freq.secs”: 600, “storm.messaging.netty.min_wait_ms”: 100, “nimbus.task.timeout.secs”: 30, “topology.sleep.spout.wait.strategy.time.ms”: 1, “topology.optimize”: true, “nimbus.reassign”: true, “storm.messaging.transport”: “backtype.storm.messaging.zmq”, “logviewer.appender.name”: “A1”, “nimbus.host”: “localhost”, “ui.port”: 8080, “supervisor.slots.ports”: [1, 2, 3], “nimbus.file.copy.expiration.secs”: 600, “supervisor.monitor.frequency.secs”: 3, “ui.childopts”: “-Xmx768m”, “transactional.zookeeper.servers”: null, “zmq.linger.millis”: 0, “topology.error.throttle.interval.secs”: 10, “topology.worker.shared.thread.pool.size”: 4, “topology.executor.receive.buffer.size”: 1024, “topology.spout.wait.strategy”: “backtype.storm.spout.SleepSpoutWaitStrategy”, “task.heartbeat.frequency.secs”: 3, “topology.transfer.buffer.size”: 1024, “storm.zookeeper.session.timeout”: 20000, “topology.stats.sample.rate”: 0.05, “topology.fall.back.on.java.serialization”: true, “supervisor.childopts”: “-Xmx256m”, “topology.enable.message.timeouts”: true, “storm.messaging.netty.max_wait_ms”: 1000, “nimbus.topology.validator”: “backtype.storm.nimbus.DefaultTopologyValidator”, “nimbus.supervisor.timeout.secs”: 60, “topology.disruptor.wait.strategy”: “com.lmax.disruptor.BlockingWaitStrategy”, “storm.messaging.netty.buffer_size”: 5242880, “drpc.port”: 3772, “topology.kryo.factory”: “backtype.storm.serialization.DefaultKryoFactory”, “storm.zookeeper.retry.interval”: 1000, “storm.messaging.netty.max_retries”: 30, “topology.tick.tuple.freq.secs”: 30, “supervisor.enable”: true, “nimbus.task.launch.secs”: 120, “task.refresh.poll.secs”: 10, “topology.message.timeout.secs”: 30, “nimbus.inbox.jar.expiration.secs”: 3600, “topology.state.synchronization.timeout.secs”: 60, “topology.name”: “test”, “supervisor.worker.timeout.secs”: 30, “topology.trident.batch.emit.interval.millis”: 50, “topology.builtin.metrics.bucket.size.secs”: 60, “storm.thrift.transport”: “backtype.storm.security.auth.SimpleTransportPlugin”, “logviewer.port”: 8000, “topology.kryo.decorators”: [], “topology.debug”: true}

  • Example context:

    {“task->component”: {“11”: “__acker”, “10”: “TrendsSpout”, “1”: “TrendsSpout”, “3”: “TrendsSpout”, “2”: “TrendsSpout”, “5”: “TrendsSpout”, “4”: “TrendsSpout”, “7”: “TrendsSpout”, “6”: “TrendsSpout”, “9”: “TrendsSpout”, “8”: “TrendsSpout”}}

nextTuple()[source]

This method is called when storm request.

  • Use the functions emit(), emitDirect() and reportError() to emulate the corresponding methods for SpoutOutputCollector
  • Use the functiom log() to log messages in the parent Storm worker log
run()[source]

To define a spout in python write a module that defines a subclass of Spout, and that calls run() for and instance of that class in its __main__

class twitter_storm.storm.Tuple(id, component, stream, task, values)[source]

Bases: object

This class abstracts the tuples sent to a bolt. See https://github.com/nathanmarz/storm/wiki/Multilang-protocol#bolts for details

Variables:
  • id – id of the tuple as string
  • component – id of the component that created this tuple as string
  • stream – id of the stream this tuple was emitted to as string
  • task – id of the task that created this tuple as integer
  • values – list of values this tuple is composed
twitter_storm.storm.ack(tup)[source]

Used to emulate the method OutputCollector.ack()

twitter_storm.storm.emit(*args, **kwargs)[source]

Used to emulate the methods SpoutOutputCollector.emit() and OutputCollector.emit(). Use in Spouts and Bolts to emit a tuple.

Parameters:
  • args – I have identified just a single argument: - tup: list of values corresponding to the tuple to be emitted. The types of these values should be in sync with those declared in the method declareOutputFields() of the ShellSpout or ShellBolt that is wrapping the python code that calls this function, in the Java program defining the topology
  • kwargs

    some relevant possible values: - For Spouts:

    • id: message id for the tuple to emit, for guarteed message processing
    • For Bolts:
      • anchors: optional, list of ids for anchor tuples to use for this emit
    • For Spouts or Bolts:
      • stream: optional, stream to use to emit this tuple. Default stream 1 is used if not declared
twitter_storm.storm.emitBolt(tup, stream=None, anchors=, []directTask=None)[source]

Roughly equivalent to the OutputCollector class.

twitter_storm.storm.emitDirect(task, *args, **kwargs)[source]

Used to emulate the methods SpoutOutputCollector.emitDirect() and OutputCollector.emitDirect(). Use in Spouts and Bolts to emit a tupl directly to the task id specified in a keyword argument directTask, which is required. Appart from that it’s equivalent to emit() above

twitter_storm.storm.emitSpout(tup, stream=None, id=None, directTask=None)[source]

Roughly equivalent to the SpoutOutputCollector class.

twitter_storm.storm.fail(tup)[source]

Used to emulate the method OutputCollector.fail()

twitter_storm.storm.initComponent()[source]
Multilang-protocol handshake:
  1. read from stdin the setup info from the parent process. This a JSON object with fields
  2. Send the PID of this python process to the parent process, and create an empty file at the path specified by heartbeatdir.

Returns:a list with two element, the first is a dictionary with the storm configuration, and the second is a dictionary with the storm context
twitter_storm.storm.json_decode(x)
twitter_storm.storm.json_encode(x)
twitter_storm.storm.log(msg)[source]

Used to log messages in the parent Storm worker log

twitter_storm.storm.readCommand()[source]
twitter_storm.storm.readMsg()[source]

Read messages sent by parent process. According to the multilang protocol messages end with a single line containing “end” and correspond to JSON objects

twitter_storm.storm.readTaskIds()[source]
twitter_storm.storm.readTuple()[source]
twitter_storm.storm.reportError(msg)[source]

Used to emulate the methods SpoutOutputCollector.reportError() and OutputCollector.reportError()

twitter_storm.storm.sendMsgToParent(msg)[source]
twitter_storm.storm.sendpid(heartbeatdir)[source]
Multilang-protocol handshake:
Send the PID of this python process to the parent process, and create an empty file at the path specified by heartbeatdir.
twitter_storm.storm.sync()[source]

twitter_storm.trends_bolt module

twitter_storm.twitter_components module

Based on storm.py module from https://github.com/nathanmarz/storm/blob/master/storm-core/src/multilang/py/storm.py, and the examples from https://github.com/apache/incubator-storm/blob/master/examples/storm-starter/multilang/resources/splitsentence.py and http://storm.incubator.apache.org/documentation/Tutorial.html

Packaging: To run a shell component on a cluster, the scripts that are shelled out to must be in the resources/ directory within the jar submitted to the master (https://github.com/nathanmarz/storm/wiki/Multilang-protocol). By default, Maven will look for your project’s resources under src/main/resources (https://maven.apache.org/plugins/maven-resources-plugin/examples/resource-directory.html).

Tested in Python2.7 and Apache Storm 0.9.0.1

class twitter_storm.twitter_components.GetTweetsBolt[source]

Bases: twitter_storm.twitter_components.TwitterBolt

Assumes each input tuple is of the shape (place, topic_name, query) where query is a twitter query string for the trending topic topic_name. For each input tuple Twitter is queried for the most popular tweets, and some fields are projected from each resulting tweets and then emitted, see process() for details about the fields.

In case the Twitter REST API Rate limit is hit, this bolt sleeps for some seconds.

process(tuple)[source]

Must fulfil the following contract expressed in the Java wrapper:

declarer.declare(new Fields(TopologyFields.AUTHOR_SCREEN_NAME, TopologyFields.CREATED_AT,
TopologyFields.FAV_COUNT, TopologyFields.HASHTAGS_TEXTS, TopologyFields.IN_REPLY_TO_SCREEN_NAME, TopologyFields.LANG, TopologyFields.RETWEET_COUNT, TopologyFields.RETWEETED, TopologyFields.SOURCE, TopologyFields.PLACE, TopologyFields.POSSIBLY_SENSITIVE, TopologyFields.TEXT, TopologyFields.TOPIC_NAME));
class twitter_storm.twitter_components.PlacesSpout[source]

Bases: twitter_storm.storm.Spout

Emit a tuple with a single field for a place as encoded in the module get_tweets, with the frequency specified in the storm configuration passed at initialize()

initialize(conf, context)[source]
nextTuple()[source]

Should adhere to the following contract expressed in the wrapping Java spout

declarer.declare(new Fields(TopologyFields.PLACE));

class twitter_storm.twitter_components.TrendsBolt[source]

Bases: twitter_storm.twitter_components.TwitterBolt

Assumes each input tuple has a single field for the name of a place as encoded in get_tweets. This bolt emits tuples (place, trend name, query) for the trending topics at the coordinates corresponding to that place. In case the Twitter REST API Rate limit is hit, this bolt sleeps for some seconds.

process(tuple)[source]
class twitter_storm.twitter_components.TwitterBolt[source]

Bases: twitter_storm.storm.Bolt

This class extends storm.Bolt as no ack is handled because we are using a non reliable source with no defined id for the messages. As additional functionality, a tweepy.api.API object ready to used is stored at self._twitter_api during initialization.

NOTE: don’t forget to setup authentication calling “python2.7 get_tweets.py” __before__ compiling the topology: the auth file has to be included in the jar and copied to the cluster

initialize(stormconf, context)[source]
twitter_storm.twitter_components.log_tweeter_error(tweep_error, sleep_time=2)[source]
Parameters:
  • tweep_error – Exception dealing with twitter to log to the parent process
  • sleep_time – time in seconds to sleep before continuing the execution

Module contents