<CF_HOME>/cfusion/bin に移動します。
Pub/Sub はパブリッシュ/サブスクライブ(Pub/Sub)サービスであり、メッセージの送信者と受信者を分離するメッセージングプラットフォームです。Pub/Sub サービスには、次のようないくつかの重要なエンティティがあります。
- メッセージ:サービス内を移動するデータ。
- トピック:メッセージのフィードを表す、名前付きエンティティ。
- サブスクリプション:特定のトピックについてのメッセージ受信における関心を表す、名前付きエンティティ。
- パブリッシャー(プロデューサーとも呼ばれる):メッセージを作成し、指定したトピックのメッセージングサービスに送信(パブリッシュ)します。
- サブスクライバー(コンシューマーとも呼ばれる):指定されたサブスクリプションでメッセージを受信します。
詳しくは、GCP Pub/Sub を参照してください。
-
-
次のコマンドを入力します。
- Windows:cfpm.bat
- Windows 以外の場合:/cfpm.sh
-
コマンド install gcppubsub を入力します。
パッケージがインストールされるまで待ちます。
非インタラクティブモードでのパッケージのインストール
コマンドプロンプトを開き、coldFusion/cfusion/bin フォルダーに移動して、cfpm install gcppubsub
と入力します。
詳しくは、ColdFusion パッケージマネージャーを参照してください。
GCP PubSub にアクセスするための資格情報の取得
GCP を操作する場合、資格情報を検証し、リクエストしているリソースにアクセスするための権限を持っているかどうかをチェックするために、GCP セキュリティ資格情報を指定します。
GCP は、セキュリティ資格情報を使用して、リクエストを認証および承認します。
詳しくは、GCP API キーの概要を参照してください。
GCP サービスの認証
GCP サービスにアクセスするには、Google Cloud コンソールを使用してサービスアカウントを作成します(または、gcloud CLI、REST 呼び出しを使用するか、プログラムで実行することもできます)。このようなアカウントは、Google Cloud の ID およびアクセス管理(IAM)によって管理されています。
詳しくは、GCP サービスの認証を参照してください。
クラウドサービス資格情報および設定の追加
様々なクラウドサービスにアクセスするための、オブジェクトを作成するハンドルを提供するメソッド getCloudService() を使用します。
サービスハンドルのシンタックスを次に示します。
service=getCloudService(struct cloudCred,struct cloudConfig), where:
- cloudCred:クラウドサービスの資格情報を定義します。構造体または文字列です(資格情報エイリアスとも呼ばれます)。
- cloudConfig:クラウドサービス設定の詳細を定義します。構造体または文字列です(設定エイリアスとも呼ばれます)。
データとサービス/クラウド資格情報で資格情報を追加します。
資格情報と設定を作成した後は、cfm ファイルで、資格情報と設定を pubsubclient=getCloudService(cred, conf) として定義できます。
追加
add cloudconfiguration alias=pubsubdemo servicename=pubsub
設定
set cloudconfiguration pubsubdemo elementCountThreshold=102 requestByteThreshold=2048 delayThreshold=10m
以下のリストでは、その他のパラメーターについて説明します。
- publisher.endpoint
- enableMessageOrdering
- enableCompression
- compressionBytesThreshold
- enableBatching
- delayThreshold
- elementCountThreshold
- requestByteThreshold
- publisher.limitExceededBehavior
- publisher.maxOutstandingElementCount
- publisher.maxOutstandingRequestBytes
- publisher.threadCount
- publisher.threadNamePrefix
- publisher.initialRetryDelay
- publisher.maxRetryDelay
- publisher.retryDelayMultiplier
- publisher.initialRpcTimeout
- publisher.maxRpcTimeout
- publisher.rpcTimeoutMultiplier
- publisher.totalTimeout
- publisher.maxAttempts
- subscriber.endpoint
- maxAckExtensionPeriod
- minDurationPerAckExtensionPeriod
- maxDurationPerAckExtensionPeriod
- parallelPullCount
- useLegacyFlowControl
- subscriber.limitExceededBehavior
- subscriber.maxOutstandingElementCount
- subscriber.maxOutstandingRequestBytes
- subscriber.threadCount
- subscriber.threadNamePrefix
- system.threadCount
- system.threadNamePrefix
使用例
set cloudconfiguration pubsubdemo publisher.endpoint=pubsub.googleapis.com subscriber.endpoint=us-east1-pubsub.googleapis.com:443 enableMessageOrdering=false enableCompression=False compressionBytesThreshold=1 delayThreshold='2m’
表示
- クラウド資格情報:show cloudcredential <cred_name>
- クラウド設定:show cloudcredential <cred_name>
削除
- クラウド資格情報:delete cloudcredential <cred_name>
- クラウド設定:delete cloudconfiguration <conf_name>
すべての GCP リソース名では大文字と小文字が区別されます。
トピック、サブスクリプション、スキーマ、スナップショットの名前に関するガイドライン
名前に関するガイドラインについては、このドキュメントを参照してください。
PubSub のトピックのメッセージ形式
GCP Pubsub トピックに送信されるメッセージ形式。
{
“data”: string/struct/CFC/query/array/binary data
“attributes”: {
string: string,
...
},
“messageId”: string,
“publishTime”: string,
“orderingKey”: string
}
PullMessages メソッドを使用してサブスクリプションからメッセージをプルするときに、GCP PubSub サービスから受信するメッセージ形式:
Message = {
MessageID: “”,
Data: “Hello World!”,
Attributes: {
year: “2020”,
author: “Martin Kleppman”
}, // 属性のキーと値のペアは文字列のみである必要があります。その他のデータ型には対応していません。OrderingKey: “”,
PublishTime: “”,
AttributesCount: 2,
AckId: “”,
DeliveryAttempts: 1
}
登録されたメッセージハンドラーを使用してメッセージを非同期に受信するときに、onSuccess メッセージハンドラーで受信するメッセージ形式:
Message = {
MessageID: “”,
Data: “Hello World!”,
Attributes: {
year: “2020”,
author: “Martin Kleppmann”
}, // 属性のキーと値のペアは文字列のみである必要があります。その他のデータ型には対応していません。OrderingKey: “”,
PublishTime: “”,
AttributesCount: 2
}
属性を含むトピックの作成
シンタックス
createTopic(topicName [, structAttributes])
パラメーター
トピック属性構造体には、次のキーと値のペアが含まれています。
| パラメーター | 説明 | 必須 |
|---|---|---|
| topicName | 作成するトピックの名前。 | はい |
messageRetentionDuration |
Pub/Sub トピックがパブリッシュ後にメッセージを保持する期間を指定します。メッセージの保持期間が経過すると、確認応答状態にかかわらず、Pub/Sub によりメッセージが破棄される可能性があります。 トピック内のメッセージの表示時間の値の許容範囲は、10 分から 31 日です。受け入れられる期間パラメーターは次のとおりです。
|
はい |
schemaSettings |
構造体には、次が含まれます。
|
はい |
labels |
キーと値のペアとしてのトピックラベル |
はい |
messageStoragePolicy |
メッセージが保存されるリージョンを指定します。トピックには、メッセージストレージポリシーを設定することができます。 |
はい |
kmsKeyName |
トピックが CMEK(顧客管理の暗号鍵)で暗号化されるかどうかを指定します。Pub/Sub はデフォルトでは、Google が管理する鍵を使用してメッセージを暗号化します。詳しくは、このドキュメントを参照してください。 |
はい |
戻り値
トピック ID。
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName = “mytopic”
topicAttributes = {
messageRetentionDuration : “600s”,
schemaSettings : {
schemaName : “my-schema”,
schemaEncoding : “JSON” // or “BINARY”
},
labels : {
“shape” : “circle”,
“color” : “green”
},
messageStoragePolicy: ["asia-northeast1"],
kmsKeyName = "projects/my-gcp-project/locations/asia-northeast1/keyRings/customKMSKey/cryptoKeys/customManagedKey"
}
Topic = pubsubClient.createTopic(topicName, topicAttributes)
</cfscript>
トピックのリスト表示
シンタックス
pubSubClient.listTopics(listAttributesStruct)
トピックの数を返します。
デフォルトでは、100 のトピックが返されます。戻り値の型はリストです。
パラメーター
| パラメーター | 説明 | 必須 |
| pageSize | 返されるサブスクリプションの最大数。 | いいえ |
| nextPageToken | サブスクリプションの次のページを返す値。 | いいえ |
トピックリストの次のページについては、listTopics を呼び出し、属性構造体の nextPageToken 値を渡します。
例
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
Pubsubclient = getCloudService(cred, conf)
topics = pubsubclient.listTopics({pageSize: 100})
Writedump(topics)
</cfscript>
トピックの削除
シンタックス
deleteTopic(topicName)
パラメーター
パラメーター |
説明 |
topicName |
削除するトピックの名前。 |
例
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName="mytopic"
pubsubclient.createTopic(topicName)
pubsubclient.deleteTopic(topicName)
</cfscript>
トピックのサブスクリプションのリスト表示
シンタックス
listTopicSubscriptions(topicName)
パラメーター
パラメーター |
説明 |
topicName |
サブスクリプションのあるトピックの名前。 |
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName=“mytopic”
subName1=“subscription_01”
subName2=“subscription_02”
subscriptionMetadata1 = {
“subscriptionName” : subName1
}
subscriptionMetadata2 = {
“subscriptionName” : subName2
}
topic=pubsubclient.createTopic(topicName)
// add a subscription
subscription1=topic.subscribe(subscriptionMetadata1)
subscription2=topic.subscribe(subscriptionMetadata2)
subscriptions = topic.listSubscriptions()
writeDump(subscriptions)
</cfscript>
トピックの更新
シンタックス
updateTopic(topicName, topicAttributes)
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName = “mytopic”
topicAttributes= {
labels : {
shape : “square”,
color : “blue”
},
messageStoragePolicy: [“asia-east1-a”, “europe-west1-c”],
messageRetentionDuration: “600s”,
schemaSettings : {
schemaName : “testSchema”,
schemaEncoding : “Binary”
}
}
topic = pubsubclient.createTopic(topicName, topicAttributes)
updatedTopicAttributes = {
labels : {
size : “10cm”,
},
messageRetentionDuration: “300s”
}
pubsubclient.updateTopic(topicName, updatedTopicAttributes)
</cfscript>
トピックオブジェクトのメソッド
- Struct publish(Struct messageMetadata)
- Subscription subscribe(String subscriptionName)
- Subscription subscribe(Struct subscriptionMetadata)
- Subscription getSubscription(String subscriptionName)
- Subscription updateSubscription(Struct subscriptionMetadata)
- Struct detachSubscription(String subscriptionName)
- Struct deleteSubscription(String subscriptionName)
- Struct listSubscriptionsS)
- Struct listSubscriptions(Struct listMetadata)S
- Struct listSnapshots()
- Struct listSnapshots(Struct listMetadata)
- String getId()
- String getName()
- void setKmsKeyName(String keyName)
- void setMessageRetentionDuration(String duration)
- void setMessageStoragePolicy(Array messageStoragePolicy)
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- String getKmsKeyName()
- String getMessageRetentionDuration()
- Array getMessageStoragePolicy()
- Struct getLabels()
- boolean containsLabels(String key)
- Struct getSchemaSettings()
- void delete()
- Struct getIAMPolicy(Struct policyMap)
- Struct setIAMPolicy(Struct policyMap)
- Struct addIAMPolicy(Struct policyMap)
- Struct testIAMPermissions(Struct permissionsMap)
- Struct publishAllOutstanding()
- Struct resumePublish(String orderingKey)
サブスクリプション管理
サブスクリプションの接続解除
説明
ユーザーがサブスクリプションを解除すると、サブスクリプションはトピックからデータを読み取ることができなくなります。サブスクリプションに保存されているすべてのメッセージは、未確認および確認済みにかかわらず削除されます。
シンタックス
detachSubscription(subscriptionName)
戻り値
サブスクリプションの構造体。
パラメーター
パラメーター |
説明 |
必須 |
subscriptionName |
トピックから接続解除されるサブスクリプションの名前。サブスクリプションが削除されることはありません。別のトピックに接続したり、元のトピックに再接続したりすることができます。 |
はい |
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName = “myTopic”
subName=“mySubscription”
subscriptionAttributes= {
“subscriptionName” : subName
}
topic = pubsubclient.createTopic(topicName)
subscription=topic.subscribe(subscriptionMetadata)
topic.detachSubscription(subName)
</cfscript>
サブスクリプションの更新
シンタックス
Pubsubclient.updateSubscription(subscriptionAttributesStruct)
Subscription.updatesubscription(subscriptionAttributesStruct)
すべての PubSub エンティティ名では大文字と小文字が区別されます。
パラメーター
パラメーター |
説明 |
必須 |
| topicName | トピックの名前。 | はい |
SubscriptionName |
サブスクリプション ID |
はい |
deliveryConfig |
構造体には、次が含まれます。
|
はい |
retryPolicy |
構造体には、次が含まれます。
|
はい |
deadLetterPolicy |
構造体には、次が含まれます。
|
はい |
ラベル |
Struct containing- author, year, env. |
はい |
フィルター |
トピックのフィルター属性。 |
はい |
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName = “myTopic”
subName= “mySubscription”
subscriptionDetails = {
subscriptionName : subName,
topicName : topicName,
AckDeadlineSeconds : “600s”,
enableExactlyOnceDelivery : false,
enableMessageOrdering : false,
retainAckedMessages : false,
MessageRetentionDuration = “700s”
filter = “attributes.color='green”
}
subscription= pubsubclient.subscribe(subscriptionDetails)
label=“listProjectSubscriptions”)
newAttributes = {
subscriptionName : subName,
enableExactlyOnceDelivery : true,
AckDeadlineSeconds : “400s”,
enableMessageOrdering : true,
retainAckedMessages : true,
MessageRetentionDuration : “120s”
filter = “attributes.color='red”
}
pubsubClient.updateSubscription(newAttributes)
</cfscript>
サブスクリプションの削除
シンタックス
deleteSubscription(subscriptionName)
パラメーター
パラメーター |
説明 |
必須 |
subscriptionName |
トピックから削除するサブスクリプションの名前。 |
はい |
次に例を示します。
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName = “myTopic”
subName=“mySubscription”
subscriptionAttributes = {
"subscriptionName" : subName
}
topic = pubsubclient.createTopic(topicName)
topic.subscribe(subscriptionAttributes)
topic.deleteSubscription(subName)
</cfscript>
<cfscript>
cred = {
projectId : “<your project id>”,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient=getCloudService(cred, conf)
topicName=“myTopic”
topic= pubsubClient.createTopic(topicName)
message = {
data: “hello world!”}
subName = "mysubscription"
subscriptionAttributes= {
"subscriptionName" : subName,
"topicName" : topicName
}
topic.subscribe(subscriptionAttributes)
cffuture=topicObj.publish(message)
MessageID = Cffuture.get()
writeOutput("Message with ID #MessageID# published")
</cfscript>
パブリッシュしたメッセージの例
非同期にメッセージのトピックへのパブリッシュ
messageMetadata = {
data: "Hello World 12345!",
attributes: {
author: "Martin Kleppmann",
age: 33,
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。
Avro エンコードによるメッセージのトピックへのパブリッシュ
MyCFC cfcObj = new MyCFC()
messageMetadata = {
data: serializeAVRO(cfcObj, schema),
attributes: {
"attribute1" : "value1",
"attribute2" : "value2"
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。
Protobuf エンコードによるメッセージのトピックへのパブリッシュ
MyCFC cfcObj = new MyCFC()
messageMetadata = {
data: serializeProtoBuf(cfcObj, schema),
attributes: {
"attribute1" : "value1",
"attribute2" : "value2"
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。
バッチ設定によるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName: "GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConfig = {
alias: "PubSubOne",
serviceName: "PUBSUB",
publisherSettings: {
batchingSettings: {
enable: true, // default: false
elementCountThreshold : 1, // default: 100 messages
requestByteThreshold : 50, // default: 1000 bytes
delayThreshold : "100m" // default: 1 milliseconds
}
}
};
pubsub = getCloudService(gcpCred, pubsubConf)
myTopic = pubsub.getTopic("myTopic")
messageMetadata = {
"data" : "Hello World!!" }
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
同時実行制御によるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName:"GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConfig = {
alias: "PubSubOne",
serviceName: "PUBSUB",
publisherSettings: {
executorProvider: {
threadCount: 4,
threadNamePrefix: "cf-pubsub-thread"
}
}
}
pubsub = getCloudService(gcpCred, pubsubConf)
topic = pubsub.getTopic("myTopic")
messageMetadata = {
"data" : "Hello"
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
カスタム属性によるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName: "GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConf = {
alias: "pubsub_conf_alias",
serviceName: "PUBSUB"
}
pubsub = cloudService(gcpCred, pubsubConf)
topic = pubsub.getTopic("myTopic")
messageMetadata = {
message: {
data: "Hello World 12345!",
attributes: {
author : "Martin Fowler",
year : 2014
}
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
フロー制御設定によるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName:"GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConfig = {
alias: "PubSubOne",
serviceName: "PUBSUB",
publisherSettings: {
batchingSettings: {
enable: true,
elementCountThreshold: 1, // default: 100 messages
requestByteThreshold: 50, // default: 1000 bytes
delayThreshold: "100m", // default: 1 milliseconds
flowControlSettings: {
maxOutstandingRequestBytes : 10240,
maxOutstandingElementCount : 100,
limitExceededBehavior : "Block" // Supported values: Block, Ignore, ThrowException
}
}
}
}
pubsub = getCloudService(gcpCred,pubsubConf)
topic = pubsub.getTopic("myTopic")
messageMetadata = {
message: {
data : "Hello World 12345!",
attributes : {
author : "Martin Fowler",
year : 2014
}
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
順序指定キーによるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName:"GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConfig = {
alias: "PubSubOne",
serviceName: "PUBSUB",
publisherSettings: {
endpoint: "asia-northeast1-pubsub.googleapis.com:443",
enableMessageOrdering: "true"
}
}
pubsub = getCloudService(gcpCred, pubsubConf)
topic = pubsub.getTopic("myTopic")
messageMetadata = {
message: {
data: "Hello World 12345!",
orderingKey: "NamingFilter"
}
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
再試行設定によるメッセージのトピックへのパブリッシュ
<cfscript>
gcpCred = {
vendorName:"GCP",
alias: "gcp_cred_alias" // このエイリアスは CF 管理者により定義される必要があります。}
pubsubConfig = {
alias: "PubSubOne",
serviceName: "PUBSUB",
publisherSettings: {
retrySettings: {
initialRetryDelay. : "100m", // default: 100 ms
initialRpcTimeout : "5s", // default: 5 seconds
maxAttempts : 5, // default: 0
maxRetryDelay : "60s", // default : 60 seconds
maxRpcTimeout : "10M", // default: 600 seconds
maxDelayMultiplier : 2, // back off for repeated failures, default: 1.3
rpcTimeoutMultiplier : 1, // default: 1.0
totalTimeout : "10M" // default: 600 seconds
}
}
}
pubsub = getCloudService(gcpCred,pubsubConfing)
topic = pubsub.getTopic("myTopic")
messageMetadata = {
"messages" : [{
data : "Hello World!" }]
}
publishedFuture = topic.publish(messageMetadata)
WriteDump(publishedFuture.get()) // パブリッシュされたメッセージ ID を返す必要があります。</cfscript>
未処理のメッセージをすべてパブリッシュ
シンタックス
publishAllOutstanding(topicName)
パラメーター
| パラメーター | 説明 | 必須 |
| topicName |
トピックの名前。 | はい |
例
<cfscript>
cred = {
projectId : “<your project id>“,
credentialJsonFilePath : “<path to your credentials json file>”
}
conf = {
serviceName : “pubsub”
}
pubsubclient = getCloudService(cred, conf)
response = pubsubclient.publishAllOutstanding(“myTopic”)
WriteDump(response)
</cfscript>
サブスクリプションオブジェクトのメソッド
- String getId()
- String getName()
- String getTopicId()
- String getTopicName()
- int getAckDeadlineSeconds()
- Struct getLabels()
- String getMessageRetentionDuration()
- boolean isEnableExactlyOnceDelivery()
- boolean isRetainAckedMessages()
- boolean isRetainAckedMessages()
- boolean isDetached()
- boolean containsLabels(String key)
- String getFilter()
- Struct getExpirationPolicy()
- Struct getRetryPolicy()
- Struct getDeadLetterPolicy()
- Struct getPushConfig()
- Struct getBigQueryConfig()
- void setAckDeadlineSeconds(long seconds)
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- void setMessageRetentionDuration(String duration)
- void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery)
- void setRetainAckedMessages(boolean retainAckedMessages)
- Struct setDetached(boolean detached)
- void setExpirationPolicy(Struct expirationPolicy)
- void setRetryPolicy(Struct retryPolicy)
- void setDeadLetterPolicy(Struct deadLetterPolicy)
- void setDeliveryConfig(Struct deliveryConfig)
- void registerMessageHandler(Struct messageHandlerMetadata)
- Struct pullMessages(Struct pullMessageMetadata)
- Struct acknowledgeMessages(Struct acknowledgeMetadata)
- Struct modifyAckDeadline(Struct ackDeadlineMetadata)
- Struct seekMessages(Struct seekMessageMetadata)
- Struct getIAMPolicy(Struct policyStruct)
- Struct setIAMPolicy(Struct policyStruct)
- Struct addIAMPolicy(Struct policyStruct)
- Struct testIAMPermissions(Struct permissionsStruct)
- Struct delete()
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
topicName = "myTopic"
topic = pubsubClient.createTopic(topicName)
subName="mysubscription"
subscriptionMetadata = {
"subscriptionName" : subName
}
// サブスクリプションの作成
topic.subscribe(subscriptionMetadata)
// スナップショットの作成
snapshotName="snshot-01"
snapshotAttributes = {
snapshotName: snapshotName,
subscriptionName: subName,
labels : {
name : "wrench",
mass : "13kg",
createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"),
emptykey : ""
}
}
snapshotResponse=pubsubClient.createSnapshot(snapshotAttributes)
</cfscript>
スナップショットの取得
シンタックス
getSnapshot(snapshotName)
パラメーター
パラメーター |
説明 |
必須 |
snapshotName |
詳細を取得するスナップショットの名前。 |
はい |
例
<cfscript>
cred = {
projectId : "my-gcp-project",
credentialJsonFilePath : "path of creds json file"
};
conf = {
serviceName : "pubsub"
};
pubsubclient=getCloudService(cred, conf)
topicName = "topic-ss-02"
// トピックの作成
topic = pubsubClient.createTopic(topicName)
subName="sub-name-ss-02"
subscriptionMetadata = {
"subscriptionName" : subName
}
// サブスクリプションの作成
topic.subscribe(subscriptionMetadata)
// スナップショットの作成
snapshotName="snshot-02"
snapshotAttributes = {
snapshotName: snapshotName,
subscriptionName: subName,
labels : {
name : "wrench",
mass : "13kg",
createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"),
emptykey : ""
}
}
// スナップショットの作成
pubsubClient.createSnapshot(snapshotAttributes)
// スナップショットの取得
getResponse=pubsubClient.getSnapshot(snapshotName)
writeDump(getResponse)
</cfscript>
スナップショットの更新
シンタックス
updateSnapshot(updatedSnapshotAttributes)
次に例を示します。
<cfscript>
cred = {
projectId : "my-gcp-project",
credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json"
};
conf = {
serviceName : "pubsub"
};
pubsubclient=getCloudService(cred, conf)
topicName="topic-update-ss01"
// トピックの作成
topic = pubsubClient.createTopic(topicName)
subName="subupdate-ss01"
subscriptionMetadata = {
"subscriptionName" : subName
}
// サブスクリプションの作成
topic.subscribe(subscriptionMetadata)
// スナップショットの作成
snapshotName="snshot-ss-01"
snapshotAttributes = {
snapshotName: snapshotName,
subscriptionName: subName,
labels : {
name : "wrench",
mass : "13kg",
createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"),
emptykey : ""
}
}
// スナップショットの作成
pubsubClient.createSnapshot(snapshotAttributes)
updatedSnaphotAttributes={
snapshotName: snapshotName,
subscriptionName: subName,
labels : {
name : "shovel",
mass : "5kg",
createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"),
emptykey : ""
}
}
// スナップショットの更新
pubsubClient.updateSnapshot(updatedSnaphotAttributes)
writeOutput("Snapshot " & snapshotName & " updated succcessfully")
</cfscript>
スナップショットのリスト表示
シンタックス
listSnapshots()
パラメーター
パラメーター |
説明 |
nextPageToken |
結果の次のページを取得して、同じリクエストを実行します。 |
次に例を示します。
<cfscript>
cred = {
projectId : "my-gcp-project",
credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
listResponse=pubsubclient.listSnapshots()
writeDump(listResponse)
</cfscript>
スナップショットの削除
シンタックス
deleteSnapshot(snapshotName)
パラメーター
パラメーター |
説明 |
snapshotName |
削除するスナップショットの名前。 |
次に例を示します。
<cfscript>
cred = {
projectId : "my-gcp-project",
credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json"
};
conf = {
serviceName : "pubsub"
};
pubsubclient=getCloudService(cred, conf)
topicName = "topic-ss-delete"
// トピックの作成
topic = pubsubClient.createTopic(topicName)
subName="sub-name-ss-delete"
subscriptionMetadata = {
"subscriptionName" : subName
}
// サブスクリプションの作成
topic.subscribe(subscriptionMetadata)
// スナップショットの作成
snapshotName="snshot-delete"
snapshotAttributes = {
snapshotName: snapshotName,
subscriptionName: subName,
labels : {
name : "wrench",
mass : "13kg",
createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"),
emptykey : ""
}
}
// スナップショットの作成
pubsubClient.createSnapshot(snapshotAttributes)
// スナップショットの削除
pubsubClient.deleteSnapshot(snapshotName)
writeOutput("Snapshot deleted successfully")
</cfscript>
メッセージのシーク
メッセージを確認すると、Pub/Sub はそのメッセージを回復できなくなります。ただし、何かを間違って確認した場合など、確認済みのメッセージを再生する必要があると感じる場合があります。その場合、シーク機能を利用して、以前に受け入れられた通信を未確認としてマークし、Pub/Sub にこれらのメッセージを再配信させることができます。ステータスを確認済みに設定すると、シークを利用して未確認メッセージを削除することもできます。
SnapshotName または timestamp を使用して、メッセージをシークすることができます。両方を使用することはできません。
シンタックス
seekMessages(requestParameters)
パラメーター
パラメーター |
説明 |
必須 |
snapshotName |
スナップショットの名前。 |
はい |
subscriptionName |
サブスクリプションの名前。 |
はい |
timeStamp |
スナップショットのタイムスタンプ。 |
いいえ |
次に例を示します。
<cfscript>
cred = {
projectId : "adbe-gcp0318",
credentialJsonFilePath : "C:\GCP-keys\adbe-gcp0318-ab38bee5f1e0.json"
};
conf = {
serviceName : "pubsub"
};
pubSubClient=getCloudService(cred,conf)
// 名前の設定
topicName = "topic-ss-seek"
subName = "sub_ss";
snapshotName="snapshot_ss"
// トピックの作成
topic = pubsubClient.createTopic(topicName)
subscriptionMetadata = {
"subscriptionName" : "#subName#",
"topicName" : "#topicName#"
}
// トピックへのサブスクライブ
subscription = topic.subscribe(subscriptionMetadata)
// スナップショットの作成
snapshot = pubsubClient.createSnapshot({
snapshotName: snapshotName,
subscriptionName: subName
})
writeOutput("seeking to snapshot..replaying msgs.<hr>")
snapshotID=snapshot.getID()
seekAttributes = {
subName,
snapshotID
}
seekResponse = pubsubClient.seekMessages(seekAttributes)
writeDump(seekAttributes)
</cfscript>
スナップショットオブジェクトのメソッド
- String getId()
- String getName()
- String getTopic()
- OleDateTime getExpireTime()
- boolean hasExpireTime()
- boolean containsLabels(String key)
- Struct getLabels()
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- Struct delete()
- Struct getIAMPolicy(Struct policyStruct)
- Struct setIAMPolicy(Struct policyStruct)
- Struct addIAMPolicy(Struct policyStruct)
- Struct testIAMPermissions(Struct permissionsStruct)
セキュリティ API
GCP PubSub は、Identity and Access Management を使用して、アクセスを制御を行います。アクセス制御は、プロジェクトレベルとリソースレベルで構成できます。詳しくは、アクセス制御を参照してください。
IAM ポリシーの追加
シンタックス
setIAMolicy(subscriptionPolicyMetadata)
パラメーター
パラメーター |
説明 |
resourceName |
サブスクリプションの名前。 |
resourceType |
有効な値は次のとおりです。
|
bindings |
IAM ポリシーを追加できるメンバーのロールとメールを指定します。ロールの有効な値は次のとおりです。
|
次に例を示します。
<cfscript>
cred = {
projectId : "my-gcp-project",
credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json"
};
conf = {
serviceName : "pubsub"
};
pubsubclient=getCloudService(cred, conf)
topicName="topic-add-iam"
// トピックの作成
topic = pubsubClient.createTopic(topicName)
// サブスクリプションの作成
subName="sub-add-iam"
subscriptionMetadata = {
"subscriptionName" : subName
}
topic.subscribe(subscriptionMetadata)
// IAM ポリシーの追加
addIamPolicy = {
resourceName : subscriptionName,
resourceType : "Subscription",
bindings : {
role : "roles/viewer",
members : ["user:user@example.com"]
}
}
pubsubClient.addIAMPolicy(addIamPolicy)
writeOutput("IAM policy added successfully")
</cfscript>
IAM ポリシーの設定
シンタックス
setIAMPolicy(policyStruct)
パラメーター
| パラメーター | 説明 | 必須 |
| subName | IAM ポリシーを設定する必要があるサブスクリプションの名前。 |
はい |
例
<cfscript>
this.gcpCred = {
"vendorName":"GCP",
"alias": "gcp_cred_alias",
"projectid": "{ProjectID}"
}
this.pubsubConf = {
"alias":"pubsub_conf_alias",
"serviceName" : "PUBSUB",
}
pubsub = cloudService(this.gcpCred, this.pubsubConf)
// 新規ロールの作成 > メンバーのバインディングとトピックポリシーの更新
topicPolicyMetadata = {
"resource" : "topicName",
"policy" : "oldPolicy",
"bindings" : {
"role" : "roles/pubsub.editor",
"members" : "allUsers",
"conditions" : ""
}
}
// IAM ポリシーの設定
pubsub.setIAMPolicy(topicPolicyMetadata)
// IAP ポリシーの取得
pubsub.getIAMPolicy("topicName")
</cfscript>
IAM ポリシーの取得
シンタックス
getIAMPolicy(policyStruct)
パラメーター
| パラメーター | 説明 | 必須 |
| resourceName | サブスクリプションの名前。 | はい |
| resourceType | トピックまたはサブスクリプション | はい |
| policyOptions | オプションの構造 | はい |
例
<cscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
topicName = "topic-add-policy"
subscriptionName = "sub-#topicName#"
topic = pubsubClient.createTopic(topicName)
subscriptionAttributes = {
subscriptionName : "#subscriptionName#",
topicName : "#topicName#"
}
subscription = topic.subscribe(subscriptionAttributes)
policyMapV1 = {
version = "1",
resourceName : subscriptionName,
resourceType : "Subscription",
binding : [{
ROLE : "roles/editor",
MEMBERS : ["user:pn@adb.com"]
},
{
ROLE : "roles/owner",
MEMBERS : ["user:pn@adb.com"]
},
{
ROLE : "roles/viewer",
MEMBERS : ["user:mj@adb.com","user:pn@adb.com"]
}]
}
setIAMPolicy = pubsubClient.setIAMPolicy(policyMapV1)
cfdump(var=#setIAMPolicy#, label="set policy")
iAMPolicy = pubsubClient.getIAMPolicy({
resourceName : subscriptionName,
resourceType : "Subscription",
policyOptions : {
requestedPolicyVersion : policyMapV1[VERSION]
}
})
cfdump(var=#iAMPolicy#, label="get policy")
</cfscript>
IAM 権限のテスト
シンタックス
testIAMPermissions(parameterStruct)
パラメーター
| パラメーター | 説明 | 必須 |
| resourceName | サブスクリプションの名前。 | はい |
| resourceType | トピックまたはサブスクリプション | はい |
| permissions | リソースを確認するための一連の権限。 |
はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
topicName = "topic-add-policy"
subscriptionName = "sub-#topicName#"
topic = pubsubClient.createTopic(topicName)
subscriptionAttributes = {
subscriptionName : "#subscriptionName#",
topicName : "#topicName#"
}
subscription = topic.subscribe(subscriptionAttributes)
policyMapV1 = {
version = "1",
resourceName : subscriptionName,
resourceType : "Subscription",
binding : [{
ROLE : "roles/editor",
MEMBERS : ["user:pn@example.com"]
},
{
ROLE : "roles/owner",
MEMBERS : ["user:pn@example.com"]
},
{
ROLE : "roles/viewer",
MEMBERS : ["user:mj@example.com","user:pn@example.com"]
}]
}
setIAMPolicy = pubsubClient.setIAMPolicy(policyMapV1)
cfdump(var=#setIAMPolicy#, label="set policy")
iAMPolicy = pubsubClient.getIAMPolicy({
resourceName : subscriptionName,
resourceType : "Subscription",
policyOptions : {
requestedPolicyVersion : policyMapV1[VERSION]
}
})
cfdump(var=#iAMPolicy#, label="get policy")
addIamPolicy = {
resourceName : "#subName#",,
resourceType : "Subscription",
bindings : {
role : "roles/editor",
members : ["user:mj@adb.com"]
}
}
iAMPolicy = pubsubClient.addIAMPolicy(addIamPolicy);
cfdump(var=#iAMPolicy#, label="add policy")
testIAMPermissions = {
resourceName : "#subName#",
resourceType : "Subscription",
permissions : ["pubsub.topics.attachSubscription", "pubsub.topics.publish", "pubsub.topics.update"]
}
permissions = pubsubClient.testIAMPermissions(testIAMPermissions);
WriteDump(permissions);
</cfscript>
メッセージの取り込み
シンタックス
pullMessages(structMetadata)
パラメーター
| パラメーター | 説明 | 必須 |
| subscriptionName |
サブスクリプションの名前。 | はい |
| maxMessages |
プルされるメッセージの最大数。 | はい |
| returnImmediately |
サブスクリプションに maxMessages に対応する十分なメッセージが含まれていない場合でも、すぐにメッセージを返します。 |
いいえ |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient = getCloudService(cred, conf)
topic = pubsubclient.getTopic("myTopic")
subscription = topic.getSubscription("mySubscription”)
pullMessageMetadata = {
maxMessages: 100
}
pullMsgResponse = subscription.pullMessages(pullMessageMetadata)
for(message in pullMsgResponse.messages) {
WriteOutput(message.ackId)
WriteOutput(message.messageId)
WriteOutput(message.data)
WriteOutput(message.publishTime)
</cfscript>
メッセージの確認
シンタックス
acknowledgeMessages(acknowledgeMetadata)
パラメーター
| パラメーター | 説明 | 必須 |
| subscriptionName |
サブスクリプションの名前。 | はい |
| ackIds |
確認応答 ID の配列。 |
はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient = getCloudService(cred, conf)
topic = pubsubclient.getTopic("myTopic")
subscription = topic.getSubscription(“mySubscription”)
pullMessageMetadata = {
maxMessages: 100
}
pullMsgResponse = subscription.pullMessages(pullMessageMetadata)
ackIds = arrayNew(1)
for(message in pullMsgResponse.messages) {
ackId = message.ackId
ackIds.append(ackId)
WriteOutput(ackId)
WriteOutput(message.messageId)
WriteOutput(message.data)
WriteOutput(message.publishTime)
}
acknowledgeMessageMetadata = {
ackIds: ackIds
}
ackMsgResponse = subscription.acknowledgeMessages(acknowledgeMessageMetadata)
writeDump(ackMsgResponse)
</cfscript>
確認応答期限の変更
シンタックス
modifyAckDeadline(ackDeadlineMetadata)
パラメーター
| パラメーター | 必須 | 説明 |
| subscriptionName |
サブスクリプションの名前。 | はい |
| ackIds |
確認応答 ID の配列。 |
はい |
| ackDeadlineSeconds |
確認応答が期限切れになるまでの時間。 | はい |
例
<cfscript>
cred = {
projectId : "<your project id>"
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient = getCloudService(cred, conf)
topic = pubsubclient.getTopic("myTopic")
subscription = topic.getSubscription("mySubscription")
pullMessageMetadata = {
maxMessages: 100
}
pullMsgResponse = subscription.pullMessages(pullMessageMetadata)
ackIds = arrayNew(1)
for(message in pullMsgResponse.messages) {
ackId = message.ackId
ackIds.append(ackId)
WriteOutput(ackId)
WriteOutput(message.messageId)
WriteOutput(message.data)
WriteOutput(message.publishTime)
}
modifyAckDeadlineMetadata = {
ackIds: ackIds,
ackDeadlineSeconds: 80
}
WriteDump(subscription.modifyAckDeadline(modifyAckDeadlineMetadata));
acknowledgeMessageMetadata = {
ackIds: ackIds
}
ackMsgResponse = subscription.acknowledgeMessages(acknowledgeMessageMetadata)
writeDump(ackMsgResponse)
</cfscript>
スキーマの作成
シンタックス
createSchema(schemaStruct)
パラメーター
| パラメーター | 説明 | 必須 |
| schemaName | Avro および Protobuf のスキーマの名前。 | はい |
| type | 有効な値は、Avro または Protobuf です。 | はい |
| definition | スキーマ定義。定義を直接渡すことができます。 | はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
schemaName = "schemaOne"
schemaMetadata = {
schemaName: schemaName,
type: "protocol_Buffer",
Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}'
}
createResponse=pubsubClient.createSchema(schemaMetadata)
writeDump(createResponse)
</cfscript>
スキーマの取得
シンタックス
getSchema(schemaName)
パラメーター
- schemaName:スキーマの名前
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
schemaName = "schemaOne"
schemaMetadata = {
schemaName: schemaName,
type: "protocol_Buffer",
Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}'
}
createResponse=pubsubClient.createSchema(schemaMetadata)
createResponse=pubsubClient.createSchema(schemaMetadata)
writeDump(createResponse)
schema = pubsubClient.getSchema(schemaName)
writeOutput("schema ID: " & schema.getID() & "<br>")
writeOutput("schema Name: " & schema.getName() & "<br>")
writeOutput("schema type: " & schema.getType() & "<br>")
writeOutput("schema Definition: " & schema.getDefinition() & "<br>")
</cfscript>
プロジェクト内のすべてのスキーマのリスト表示
シンタックス
listSchemas()
listSchemas(parameterStruct)
パラメーター
| パラメーター | 説明 | 必須 |
| pageSize | 表示されるスキーマの数。 | はい |
| nextPageToken | この結果の次の ページのアクセスに使用するトークン。 |
はい |
| schemaView | 有効な値は、Full または Basic です。 | はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
schemaName = "schemaOne"
schemaMetadata = {
schemaName: schemaName,
type: "protocol_Buffer",
Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}'
}
pubsubClient.createSchema(schemaMetadata)
schema = pubsubClient.getSchema(schemaName)
first2schemas = pubsubClient.listSchemas({
pageSize: 2,
schemaView : "full"
})
writeDump(first2schemas)
</cfscript>
スキーマの削除
シンタックス
deleteSchema(schemaName)
パラメーター
- schemaName:スキーマの名前
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient=getCloudService(cred, conf)
schemaName = "schemaOne"
schemaMetadata = {
schemaName: schemaName,
type: "protocol_Buffer",
Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}'
}
pubsubClient.createSchema(schemaMetadata)
deleteResponse=pubsubClient.deleteSchema(schemaName)
writeDump(deleteResponse)
</cfscript>
スキーマの検証
シンタックス
validateSchema(schemaMetadata)
パラメーター
| パラメーター | 説明 | 必須 |
| schemaName |
スキーマの名前。 | はい |
| type |
AVRO または Protobuf | はい |
| definition |
スキーマ定義。 | はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient = getCloudService(cred, conf)
schemaMetadata = {
schemaName: "mySchema",
type: “AVRO”,
definition: ‘{
"type" : "record",
"name" : "Avro",
"fields" : [ {
"name" : "StringField”,
"type" : “string”
}, {
"name" : "IntField",
"type" : "int"
} ]
}’
}
schema = pubsubclient.validateSchema(schemaMetadata);
WriteDump(schema);
</cfscript>
メッセージの検証
シンタックス
validateMessage(validateMetadata)
パラメーター
| パラメーター | 説明 | 必須 |
| schemaName |
スキーマの名前。 | はい |
| schemaEncoding |
JSON またはバイナリ | はい |
| data |
検証するデータ。 | はい |
例
<cfscript>
cred = {
projectId : "<your project id>",
credentialJsonFilePath : "<path to your credentials json file>"
}
conf = {
serviceName : "pubsub"
}
pubsubclient = getCloudService(cred, conf)
validateMessageMetadata = {
schemaName: "mySchema",
schemaEncoding: "JSON",
data: ‘{ "name" : "charlie"}’
}
schema = pubsubclient.validateMessage(validateMessageMetadata);
WriteDump(schema);
</cfscript>
スキーマオブジェクトのメソッド
- createSchema(struct)
- deleteSchema(string)
- getSchema(string)
- getSchema(string, string)
- listSchemas()
- listSchemas(struct)
- validateSchema(struct)