Getting Started with StreamNative Cloud using Python

· tison


StreamNative Cloud is a fully-managed, flexible Pulsar as a service offering built by the original developers of Apache Pulsar.

In this blog, you will see how to use StreamNative Cloud as a messaging queue for a Python application. We will be using Python to build a simple pubsub application. We will:

  1. Start a Pulsar instance on StreamNative Cloud;
  2. Write the application logic;
  3. Run the pubsub application.

Before You Begin #

Create a StreamNative Free Tier cluster #

  1. Click the Get Started button on StreamNative official site or directly go to the console.
  2. Sign up an account or continue with Google.
  3. After organization automatically created, click Create Instance.
  4. Name your instance, choose "shared-aws" pool, and choose "Free" cluster type.
  5. After instance ready, click the instance name and jump to the admin page, and then Create Cluster.
  6. Name your cluster, choose a location near to yours, scroll to down and Confirm.

Your StreamNative Cloud cluster will be created in approximately 5 to 10 minutes.

Create a Service Account #

  1. Go to Service Accounts page.
  2. Create Service Account with a name.
  3. Download the key file.

You will authenticate with the key file when connecting to the cluster.

Connect to StreamNative Cloud #

  1. Go to Pulsar Clusters page.
  2. Click Connect To Cluster to see connection metadata.
    • private_key: path to the key file.
    • issues_url: always https://auth.streamnative.cloud (no trailing slash, see also this issue).
    • audience: instance URI, starts with urn:sn:pulsar:.
    • broker_service_url: cluster URL, starts with pulsar+ssl://.

Running the code snippet below with your parameters:

1import pulsar
2
3authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
4client = pulsar.Client('<broker-service-url>', authentication=authentication)
5producer = client.create_producer('connectivity')
6consumer = client.subscribe('connectivity', 'my-subscription', initial_position=pulsar.InitialPosition.Earliest)

If no errors occur, then you can try to produce and consume messages like:

1producer.send(b'Hello from StreamNative Cloud!')
2msg = consumer.receive(1000) # 1 second to timeout
3print(f"Received message '{msg.data()}' id='{msg.message_id()}'")
4
5# OUTPUT:
6# Received message 'b'Hello from StreamNative Cloud!'' id='(155,0,-1,-1)'

Demo: Pubsub GitHub Events with Schema #

  1. Install Requests with: pip3 install requests.
  2. Write a producer application to send events with schema:
 1# 1. Connect to StreamNative Cloud
 2import pulsar
 3
 4authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
 5client = pulsar.Client('<broker-service-url>', authentication=authentication)
 6
 7# 2. Prepare a few events of Apache Pulsar repository with GitHub REST API.
 8import requests
 9
10# Since Pulsar is continously under rapid development, the events returned can be various from time to time.
11response = requests.get('https://api.github.com/repos/apache/pulsar/events', headers={"Accept": "application/vnd.github+json"})
12events = response.json()
13
14# 3. Define schema
15class Event(pulsar.schema.Record):
16    id = pulsar.schema.String(required=True)
17    type = pulsar.schema.String(required=True)
18    repo_id = pulsar.schema.Long(required=True)
19    repo_name = pulsar.schema.String(required=True)
20    actor_id = pulsar.schema.Long(required=True)
21    actor_login = pulsar.schema.String(required=True)
22
23# 4. Produce events
24producer = client.create_producer(topic='events', schema=pulsar.schema.JsonSchema(Event))
25for event in events:
26    msg = Event(
27        id=event['id'], 
28        type=event['type'],
29        repo_id=event['repo']['id'],
30        repo_name=event['repo']['name'],
31        actor_id=event['actor']['id'],
32        actor_login=event['actor']['login'])
33    producer.send(msg)
34
35client.close()
  1. Write a consumer application to read events with schema:
 1# 1. Connect to StreamNative Cloud
 2import pulsar
 3
 4authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
 5client = pulsar.Client('<broker-service-url>', authentication=authentication)
 6
 7# 2. Copy schema definition
 8class Event(pulsar.schema.Record):
 9    id = pulsar.schema.String(required=True)
10    type = pulsar.schema.String(required=True)
11    repo_id = pulsar.schema.Long(required=True)
12    repo_name = pulsar.schema.String(required=True)
13    actor_id = pulsar.schema.Long(required=True)
14    actor_login = pulsar.schema.String(required=True)
15
16# 3. Create subscription with schema
17consumer = client.subscribe(
18    topic='events',
19    subscription_name="my-subscription",
20    initial_position=pulsar.InitialPosition.Earliest,
21    schema=pulsar.schema.JsonSchema(Event))
22
23# 4. Consume events
24while True:
25    try:
26        msg = consumer.receive(1000) # 1 second to timeout
27        print("Received message '{}' id='{}'".format(msg.value(), msg.message_id()))
28        consumer.acknowledge(msg)
29    except Exception as e:
30        print(f"Consumer failed with error: {e}")
31        break
32
33client.close()
  1. Run the producer application first, and then run the consumer application. You should observe the output as:
 1Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125303180', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,0,-1,-1)'
 2Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125296868', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 16235121, 'actor_login': 'nodece'}' id='(157,1,-1,-1)'
 3Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125261740', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 84127069, 'actor_login': 'Nicklee007'}' id='(157,2,-1,-1)'
 4Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125143279', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 19296967, 'actor_login': 'lordcheng10'}' id='(157,3,-1,-1)'
 5Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125069052', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,4,-1,-1)'
 6Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124956693', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,5,-1,-1)'
 7Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124956694', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,6,-1,-1)'
 8Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124928986', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,7,-1,-1)'
 9Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124928946', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,8,-1,-1)'
10Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124836098', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 10999109, 'actor_login': 'zbentley'}' id='(157,9,-1,-1)'
11Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124836090', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 10999109, 'actor_login': 'zbentley'}' id='(157,10,-1,-1)'
12Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124816765', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 70093548, 'actor_login': 'prdarm'}' id='(157,11,-1,-1)'
13Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124809334', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 1854121, 'actor_login': 'lihuanghai'}' id='(157,12,-1,-1)'
14Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124779277', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,13,-1,-1)'
15Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124764141', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,14,-1,-1)'
16Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124724608', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 95597048, 'actor_login': 'tjiuming'}' id='(157,15,-1,-1)'
17Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124720466', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 95597048, 'actor_login': 'tjiuming'}' id='(157,16,-1,-1)'
18Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124666998', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 12592133, 'actor_login': 'codelipenghui'}' id='(157,17,-1,-1)'
19Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124620502', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 26179648, 'actor_login': 'coderzc'}' id='(157,18,-1,-1)'
20Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124420105', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,19,-1,-1)'
21Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124407286', 'type': 'CreateEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,20,-1,-1)'
22Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124361624', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,21,-1,-1)'
23Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124360700', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,22,-1,-1)'
24Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124322292', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 8947816, 'actor_login': 'jiankafei'}' id='(157,23,-1,-1)'
25Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124284400', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,24,-1,-1)'
26Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124262048', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,25,-1,-1)'
27Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124222335', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,26,-1,-1)'
28Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124152744', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 50226895, 'actor_login': 'Anonymitaet'}' id='(157,27,-1,-1)'
29Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124019314', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 41898282, 'actor_login': 'github-actions[bot]'}' id='(157,28,-1,-1)'
30Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124009068', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 41898282, 'actor_login': 'github-actions[bot]'}' id='(157,29,-1,-1)'
31Consumer failed with error: Pulsar error: TimeOut

Meet some troubles? #

File a ticket on our user forum, or ask in your native tongue.

Want to learn more? #

Follow @StreamNative on Twitter to stay updated on StreamNative Cloud news!