StreamNative Cloud is a fully-managed, flexible Pulsar as a service offering built by the original developers of Apache Pulsar.
In this blog, you will see how to use StreamNative Cloud as a messaging queue for a Python application. We will be using Python to build a simple pubsub application. We will:
- Start a Pulsar instance on StreamNative Cloud;
- Write the application logic;
- Run the pubsub application.
Before You Begin #
- Install Python 3 (v3.7+).
- Install Pulsar Python Client with:
pip3 install pulsar-client
.
Create a StreamNative Free Tier cluster #
- Click the Get Started button on StreamNative official site or directly go to the console.
- Sign up an account or continue with Google.
- After organization automatically created, click Create Instance.
- Name your instance, choose "shared-aws" pool, and choose "Free" cluster type.
- After instance ready, click the instance name and jump to the admin page, and then Create Cluster.
- Name your cluster, choose a location near to yours, scroll to down and Confirm.
Your StreamNative Cloud cluster will be created in approximately 5 to 10 minutes.
Create a Service Account #
- Go to Service Accounts page.
- Create Service Account with a name.
- Download the key file.
You will authenticate with the key file when connecting to the cluster.
Connect to StreamNative Cloud #
- Go to Pulsar Clusters page.
- Click Connect To Cluster to see connection metadata.
- private_key: path to the key file.
- issues_url: always
https://auth.streamnative.cloud
(no trailing slash, see also this issue). - audience: instance URI, starts with
urn:sn:pulsar:
. - broker_service_url: cluster URL, starts with
pulsar+ssl://
.
Running the code snippet below with your parameters:
1import pulsar
2
3authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
4client = pulsar.Client('<broker-service-url>', authentication=authentication)
5producer = client.create_producer('connectivity')
6consumer = client.subscribe('connectivity', 'my-subscription', initial_position=pulsar.InitialPosition.Earliest)
If no errors occur, then you can try to produce and consume messages like:
1producer.send(b'Hello from StreamNative Cloud!')
2msg = consumer.receive(1000) # 1 second to timeout
3print(f"Received message '{msg.data()}' id='{msg.message_id()}'")
4
5# OUTPUT:
6# Received message 'b'Hello from StreamNative Cloud!'' id='(155,0,-1,-1)'
Demo: Pubsub GitHub Events with Schema #
- Install Requests with:
pip3 install requests
. - Write a producer application to send events with schema:
1# 1. Connect to StreamNative Cloud
2import pulsar
3
4authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
5client = pulsar.Client('<broker-service-url>', authentication=authentication)
6
7# 2. Prepare a few events of Apache Pulsar repository with GitHub REST API.
8import requests
9
10# Since Pulsar is continously under rapid development, the events returned can be various from time to time.
11response = requests.get('https://api.github.com/repos/apache/pulsar/events', headers={"Accept": "application/vnd.github+json"})
12events = response.json()
13
14# 3. Define schema
15class Event(pulsar.schema.Record):
16 id = pulsar.schema.String(required=True)
17 type = pulsar.schema.String(required=True)
18 repo_id = pulsar.schema.Long(required=True)
19 repo_name = pulsar.schema.String(required=True)
20 actor_id = pulsar.schema.Long(required=True)
21 actor_login = pulsar.schema.String(required=True)
22
23# 4. Produce events
24producer = client.create_producer(topic='events', schema=pulsar.schema.JsonSchema(Event))
25for event in events:
26 msg = Event(
27 id=event['id'],
28 type=event['type'],
29 repo_id=event['repo']['id'],
30 repo_name=event['repo']['name'],
31 actor_id=event['actor']['id'],
32 actor_login=event['actor']['login'])
33 producer.send(msg)
34
35client.close()
- Write a consumer application to read events with schema:
1# 1. Connect to StreamNative Cloud
2import pulsar
3
4authentication = pulsar.AuthenticationOauth2('{"issuer_url": "https://auth.streamnative.cloud","private_key": "<path-to-key-file>","audience": "<audience-uri>"}')
5client = pulsar.Client('<broker-service-url>', authentication=authentication)
6
7# 2. Copy schema definition
8class Event(pulsar.schema.Record):
9 id = pulsar.schema.String(required=True)
10 type = pulsar.schema.String(required=True)
11 repo_id = pulsar.schema.Long(required=True)
12 repo_name = pulsar.schema.String(required=True)
13 actor_id = pulsar.schema.Long(required=True)
14 actor_login = pulsar.schema.String(required=True)
15
16# 3. Create subscription with schema
17consumer = client.subscribe(
18 topic='events',
19 subscription_name="my-subscription",
20 initial_position=pulsar.InitialPosition.Earliest,
21 schema=pulsar.schema.JsonSchema(Event))
22
23# 4. Consume events
24while True:
25 try:
26 msg = consumer.receive(1000) # 1 second to timeout
27 print("Received message '{}' id='{}'".format(msg.value(), msg.message_id()))
28 consumer.acknowledge(msg)
29 except Exception as e:
30 print(f"Consumer failed with error: {e}")
31 break
32
33client.close()
- Run the producer application first, and then run the consumer application. You should observe the output as:
1Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125303180', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,0,-1,-1)'
2Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125296868', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 16235121, 'actor_login': 'nodece'}' id='(157,1,-1,-1)'
3Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125261740', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 84127069, 'actor_login': 'Nicklee007'}' id='(157,2,-1,-1)'
4Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125143279', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 19296967, 'actor_login': 'lordcheng10'}' id='(157,3,-1,-1)'
5Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23125069052', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,4,-1,-1)'
6Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124956693', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,5,-1,-1)'
7Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124956694', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,6,-1,-1)'
8Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124928986', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,7,-1,-1)'
9Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124928946', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 2770146, 'actor_login': 'Jason918'}' id='(157,8,-1,-1)'
10Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124836098', 'type': 'PullRequestReviewCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 10999109, 'actor_login': 'zbentley'}' id='(157,9,-1,-1)'
11Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124836090', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 10999109, 'actor_login': 'zbentley'}' id='(157,10,-1,-1)'
12Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124816765', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 70093548, 'actor_login': 'prdarm'}' id='(157,11,-1,-1)'
13Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124809334', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 1854121, 'actor_login': 'lihuanghai'}' id='(157,12,-1,-1)'
14Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124779277', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,13,-1,-1)'
15Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124764141', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,14,-1,-1)'
16Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124724608', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 95597048, 'actor_login': 'tjiuming'}' id='(157,15,-1,-1)'
17Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124720466', 'type': 'PullRequestReviewEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 95597048, 'actor_login': 'tjiuming'}' id='(157,16,-1,-1)'
18Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124666998', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 12592133, 'actor_login': 'codelipenghui'}' id='(157,17,-1,-1)'
19Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124620502', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 26179648, 'actor_login': 'coderzc'}' id='(157,18,-1,-1)'
20Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124420105', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,19,-1,-1)'
21Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124407286', 'type': 'CreateEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,20,-1,-1)'
22Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124361624', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,21,-1,-1)'
23Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124360700', 'type': 'PullRequestEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,22,-1,-1)'
24Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124322292', 'type': 'WatchEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 8947816, 'actor_login': 'jiankafei'}' id='(157,23,-1,-1)'
25Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124284400', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 6297296, 'actor_login': 'Technoboy-'}' id='(157,24,-1,-1)'
26Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124262048', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 18204803, 'actor_login': 'BewareMyPower'}' id='(157,25,-1,-1)'
27Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124222335', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 25195800, 'actor_login': 'poorbarcode'}' id='(157,26,-1,-1)'
28Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124152744', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 50226895, 'actor_login': 'Anonymitaet'}' id='(157,27,-1,-1)'
29Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124019314', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 41898282, 'actor_login': 'github-actions[bot]'}' id='(157,28,-1,-1)'
30Received message '{'_required_default': False, '_default': None, '_required': False, 'id': '23124009068', 'type': 'IssueCommentEvent', 'repo_id': 62117812, 'repo_name': 'apache/pulsar', 'actor_id': 41898282, 'actor_login': 'github-actions[bot]'}' id='(157,29,-1,-1)'
31Consumer failed with error: Pulsar error: TimeOut
Meet some troubles? #
File a ticket on our user forum, or ask in your native tongue.
Want to learn more? #
- Read Pulsar Python Client document for more usages and API references.
- Read Pulsar Messaging Concepts document for understanding how Pulsar serves messaging and streaming.
- Read StreamNative Cloud document for playing with enterprise-level Pulsar as a Service.
Follow @StreamNative on Twitter to stay updated on StreamNative Cloud news!