Amazon Kinesis

2018/08/02 - Amazon Kinesis - 5 new2 updated api methods

Changes  Update kinesis client to latest version

ListStreamConsumers (new) Link ¶

Lists the consumers registered to receive data from a stream using enhanced fan-out, and provides information about each consumer.

This operation has a limit of 10 transactions per second per account.

See also: AWS API Documentation

Request Syntax

client.list_stream_consumers(
    StreamARN='string',
    NextToken='string',
    MaxResults=123,
    StreamCreationTimestamp=datetime(2015, 1, 1)
)
type StreamARN:

string

param StreamARN:

[REQUIRED]

The ARN of the Kinesis data stream for which you want to list the registered consumers. For more information, see Amazon Resource Names (ARNs) and AWS Service Namespaces.

type NextToken:

string

param NextToken:

When the number of consumers that are registered with the data stream is greater than the default value for the MaxResults parameter, or if you explicitly specify a value for MaxResults that is less than the number of consumers that are registered with the data stream, the response includes a pagination token named NextToken. You can specify this NextToken value in a subsequent call to ListStreamConsumers to list the next set of registered consumers.

Don't specify StreamName or StreamCreationTimestamp if you specify NextToken because the latter unambiguously identifies the stream.

You can optionally specify a value for the MaxResults parameter when you specify NextToken. If you specify a MaxResults value that is less than the number of consumers that the operation returns if you don't specify MaxResults, the response will contain a new NextToken value. You can use the new NextToken value in a subsequent call to the ListStreamConsumers operation to list the next set of consumers.

type MaxResults:

integer

param MaxResults:

The maximum number of consumers that you want a single call of ListStreamConsumers to return.

type StreamCreationTimestamp:

datetime

param StreamCreationTimestamp:

Specify this input parameter to distinguish data streams that have the same name. For example, if you create a data stream and then delete it, and you later create another data stream with the same name, you can use this input parameter to specify which of the two streams you want to list the consumers for.

You can't specify this parameter if you specify the NextToken parameter.

rtype:

dict

returns:

Response Syntax

{
    'Consumers': [
        {
            'ConsumerName': 'string',
            'ConsumerARN': 'string',
            'ConsumerStatus': 'CREATING'|'DELETING'|'ACTIVE',
            'ConsumerCreationTimestamp': datetime(2015, 1, 1)
        },
    ],
    'NextToken': 'string'
}

Response Structure

  • (dict) --

    • Consumers (list) --

      An array of JSON objects. Each object represents one registered consumer.

      • (dict) --

        An object that represents the details of the consumer you registered.

        • ConsumerName (string) --

          The name of the consumer is something you choose when you register the consumer.

        • ConsumerARN (string) --

          When you register a consumer, Kinesis Data Streams generates an ARN for it. You need this ARN to be able to call SubscribeToShard.

          If you delete a consumer and then create a new one with the same name, it won't have the same ARN. That's because consumer ARNs contain the creation timestamp. This is important to keep in mind if you have IAM policies that reference consumer ARNs.

        • ConsumerStatus (string) --

          A consumer can't read data while in the CREATING or DELETING states.

        • ConsumerCreationTimestamp (datetime) --

    • NextToken (string) --

      When the number of consumers that are registered with the data stream is greater than the default value for the MaxResults parameter, or if you explicitly specify a value for MaxResults that is less than the number of registered consumers, the response includes a pagination token named NextToken. You can specify this NextToken value in a subsequent call to ListStreamConsumers to list the next set of registered consumers. For more information about the use of this pagination token when calling the ListStreamConsumers operation, see ListStreamConsumersInput$NextToken.

DeregisterStreamConsumer (new) Link ¶

To deregister a consumer, provide its ARN. Alternatively, you can provide the ARN of the data stream and the name you gave the consumer when you registered it. You may also provide all three parameters, as long as they don't conflict with each other. If you don't know the name or ARN of the consumer that you want to deregister, you can use the ListStreamConsumers operation to get a list of the descriptions of all the consumers that are currently registered with a given data stream. The description of a consumer contains its name and ARN.

This operation has a limit of five transactions per second per account.

See also: AWS API Documentation

Request Syntax

client.deregister_stream_consumer(
    StreamARN='string',
    ConsumerName='string',
    ConsumerARN='string'
)
type StreamARN:

string

param StreamARN:

The ARN of the Kinesis data stream that the consumer is registered with. For more information, see Amazon Resource Names (ARNs) and AWS Service Namespaces.

type ConsumerName:

string

param ConsumerName:

The name that you gave to the consumer.

type ConsumerARN:

string

param ConsumerARN:

The ARN returned by Kinesis Data Streams when you registered the consumer. If you don't know the ARN of the consumer that you want to deregister, you can use the ListStreamConsumers operation to get a list of the descriptions of all the consumers that are currently registered with a given data stream. The description of a consumer contains its ARN.

returns:

None

DescribeStreamConsumer (new) Link ¶

To get the description of a registered consumer, provide the ARN of the consumer. Alternatively, you can provide the ARN of the data stream and the name you gave the consumer when you registered it. You may also provide all three parameters, as long as they don't conflict with each other. If you don't know the name or ARN of the consumer that you want to describe, you can use the ListStreamConsumers operation to get a list of the descriptions of all the consumers that are currently registered with a given data stream.

This operation has a limit of 20 transactions per second per account.

See also: AWS API Documentation

Request Syntax

client.describe_stream_consumer(
    StreamARN='string',
    ConsumerName='string',
    ConsumerARN='string'
)
type StreamARN:

string

param StreamARN:

The ARN of the Kinesis data stream that the consumer is registered with. For more information, see Amazon Resource Names (ARNs) and AWS Service Namespaces.

type ConsumerName:

string

param ConsumerName:

The name that you gave to the consumer.

type ConsumerARN:

string

param ConsumerARN:

The ARN returned by Kinesis Data Streams when you registered the consumer.

rtype:

dict

returns:

Response Syntax

{
    'ConsumerDescription': {
        'ConsumerName': 'string',
        'ConsumerARN': 'string',
        'ConsumerStatus': 'CREATING'|'DELETING'|'ACTIVE',
        'ConsumerCreationTimestamp': datetime(2015, 1, 1),
        'StreamARN': 'string'
    }
}

Response Structure

  • (dict) --

    • ConsumerDescription (dict) --

      An object that represents the details of the consumer.

      • ConsumerName (string) --

        The name of the consumer is something you choose when you register the consumer.

      • ConsumerARN (string) --

        When you register a consumer, Kinesis Data Streams generates an ARN for it. You need this ARN to be able to call SubscribeToShard.

        If you delete a consumer and then create a new one with the same name, it won't have the same ARN. That's because consumer ARNs contain the creation timestamp. This is important to keep in mind if you have IAM policies that reference consumer ARNs.

      • ConsumerStatus (string) --

        A consumer can't read data while in the CREATING or DELETING states.

      • ConsumerCreationTimestamp (datetime) --

      • StreamARN (string) --

        The ARN of the stream with which you registered the consumer.

SubscribeToShard (new) Link ¶

Call this operation from your consumer after you call RegisterStreamConsumer to register the consumer with Kinesis Data Streams. If the call succeeds, your consumer starts receiving events of type SubscribeToShardEvent for up to 5 minutes, after which time you need to call SubscribeToShard again to renew the subscription if you want to continue to receive records.

You can make one call to SubscribeToShard per second per ConsumerARN. If your call succeeds, and then you call the operation again less than 5 seconds later, the second call generates a ResourceInUseException. If you call the operation a second time more than 5 seconds after the first call succeeds, the second call succeeds and the first connection gets shut down.

See also: AWS API Documentation

Request Syntax

client.subscribe_to_shard(
    ConsumerARN='string',
    ShardId='string',
    StartingPosition={
        'Type': 'AT_SEQUENCE_NUMBER'|'AFTER_SEQUENCE_NUMBER'|'TRIM_HORIZON'|'LATEST'|'AT_TIMESTAMP',
        'SequenceNumber': 'string',
        'Timestamp': datetime(2015, 1, 1)
    }
)
type ConsumerARN:

string

param ConsumerARN:

[REQUIRED]

For this parameter, use the value you obtained when you called RegisterStreamConsumer.

type ShardId:

string

param ShardId:

[REQUIRED]

The ID of the shard you want to subscribe to. To see a list of all the shards for a given stream, use ListShards.

type StartingPosition:

dict

param StartingPosition:

[REQUIRED]

  • Type (string) -- [REQUIRED]

  • SequenceNumber (string) --

  • Timestamp (datetime) --

rtype:

dict

returns:

The response of this operation contains an :class:`.EventStream` member. When iterated the :class:`.EventStream` will yield events based on the structure below, where only one of the top level keys will be present for any given event.

Response Syntax

{
    'EventStream': EventStream({
        'SubscribeToShardEvent': {
            'Records': [
                {
                    'SequenceNumber': 'string',
                    'ApproximateArrivalTimestamp': datetime(2015, 1, 1),
                    'Data': b'bytes',
                    'PartitionKey': 'string',
                    'EncryptionType': 'NONE'|'KMS'
                },
            ],
            'ContinuationSequenceNumber': 'string',
            'MillisBehindLatest': 123
        },
        'ResourceNotFoundException': {
            'message': 'string'
        },
        'ResourceInUseException': {
            'message': 'string'
        },
        'KMSDisabledException': {
            'message': 'string'
        },
        'KMSInvalidStateException': {
            'message': 'string'
        },
        'KMSAccessDeniedException': {
            'message': 'string'
        },
        'KMSNotFoundException': {
            'message': 'string'
        },
        'KMSOptInRequired': {
            'message': 'string'
        },
        'KMSThrottlingException': {
            'message': 'string'
        },
        'InternalFailureException': {
            'message': 'string'
        }
    })
}

Response Structure

  • (dict) --

    • EventStream (:class:`.EventStream`) --

      The event stream that your consumer can use to read records from the shard.

      • SubscribeToShardEvent (dict) --

        After you call SubscribeToShard, Kinesis Data Streams sends events of this type to your consumer.

        • Records (list) --

          • (dict) --

            The unit of data of the Kinesis data stream, which is composed of a sequence number, a partition key, and a data blob.

            • SequenceNumber (string) --

              The unique identifier of the record within its shard.

            • ApproximateArrivalTimestamp (datetime) --

              The approximate time that the record was inserted into the stream.

            • Data (bytes) --

              The data blob. The data in the blob is both opaque and immutable to Kinesis Data Streams, which does not inspect, interpret, or change the data in the blob in any way. When the data blob (the payload before base64-encoding) is added to the partition key size, the total size must not exceed the maximum record size (1 MB).

            • PartitionKey (string) --

              Identifies which shard in the stream the data record is assigned to.

            • EncryptionType (string) --

              The encryption type used on the record. This parameter can be one of the following values:

              • NONE: Do not encrypt the records in the stream.

              • KMS: Use server-side encryption on the records in the stream using a customer-managed AWS KMS key.

        • ContinuationSequenceNumber (string) --

          Use this as StartingSequenceNumber in the next call to SubscribeToShard.

        • MillisBehindLatest (integer) --

          The number of milliseconds the read records are from the tip of the stream, indicating how far behind current time the consumer is. A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.

      • ResourceNotFoundException (dict) --

        The requested resource could not be found. The stream might not be specified correctly.

        • message (string) --

          A message that provides information about the error.

      • ResourceInUseException (dict) --

        The resource is not available for this operation. For successful operation, the resource must be in the ACTIVE state.

        • message (string) --

          A message that provides information about the error.

      • KMSDisabledException (dict) --

        The request was rejected because the specified customer master key (CMK) isn't enabled.

        • message (string) --

          A message that provides information about the error.

      • KMSInvalidStateException (dict) --

        The request was rejected because the state of the specified resource isn't valid for this request. For more information, see How Key State Affects Use of a Customer Master Key in the AWS Key Management Service Developer Guide.

        • message (string) --

          A message that provides information about the error.

      • KMSAccessDeniedException (dict) --

        The ciphertext references a key that doesn't exist or that you don't have access to.

        • message (string) --

          A message that provides information about the error.

      • KMSNotFoundException (dict) --

        The request was rejected because the specified entity or resource can't be found.

        • message (string) --

          A message that provides information about the error.

      • KMSOptInRequired (dict) --

        The AWS access key ID needs a subscription for the service.

        • message (string) --

          A message that provides information about the error.

      • KMSThrottlingException (dict) --

        The request was denied due to request throttling. For more information about throttling, see Limits in the AWS Key Management Service Developer Guide.

        • message (string) --

          A message that provides information about the error.

      • InternalFailureException (dict) --

        • message (string) --

RegisterStreamConsumer (new) Link ¶

Registers a consumer with a Kinesis data stream. When you use this operation, the consumer you register can read data from the stream at a rate of up to 2 MiB per second. This rate is unaffected by the total number of consumers that read from the same stream.

You can register up to 5 consumers per stream. A given consumer can only be registered with one stream.

This operation has a limit of five transactions per second per account.

See also: AWS API Documentation

Request Syntax

client.register_stream_consumer(
    StreamARN='string',
    ConsumerName='string'
)
type StreamARN:

string

param StreamARN:

[REQUIRED]

The ARN of the Kinesis data stream that you want to register the consumer with. For more info, see Amazon Resource Names (ARNs) and AWS Service Namespaces.

type ConsumerName:

string

param ConsumerName:

[REQUIRED]

For a given Kinesis data stream, each consumer must have a unique name. However, consumer names don't have to be unique across data streams.

rtype:

dict

returns:

Response Syntax

{
    'Consumer': {
        'ConsumerName': 'string',
        'ConsumerARN': 'string',
        'ConsumerStatus': 'CREATING'|'DELETING'|'ACTIVE',
        'ConsumerCreationTimestamp': datetime(2015, 1, 1)
    }
}

Response Structure

  • (dict) --

    • Consumer (dict) --

      An object that represents the details of the consumer you registered. When you register a consumer, it gets an ARN that is generated by Kinesis Data Streams.

      • ConsumerName (string) --

        The name of the consumer is something you choose when you register the consumer.

      • ConsumerARN (string) --

        When you register a consumer, Kinesis Data Streams generates an ARN for it. You need this ARN to be able to call SubscribeToShard.

        If you delete a consumer and then create a new one with the same name, it won't have the same ARN. That's because consumer ARNs contain the creation timestamp. This is important to keep in mind if you have IAM policies that reference consumer ARNs.

      • ConsumerStatus (string) --

        A consumer can't read data while in the CREATING or DELETING states.

      • ConsumerCreationTimestamp (datetime) --

DeleteStream (updated) Link ¶
Changes (request)
{'EnforceConsumerDeletion': 'boolean'}

Deletes a Kinesis data stream and all its shards and data. You must shut down any applications that are operating on the stream before you delete the stream. If an application attempts to operate on a deleted stream, it receives the exception ResourceNotFoundException.

If the stream is in the ACTIVE state, you can delete it. After a DeleteStream request, the specified stream is in the DELETING state until Kinesis Data Streams completes the deletion.

Note: Kinesis Data Streams might continue to accept data read and write operations, such as PutRecord, PutRecords, and GetRecords, on a stream in the DELETING state until the stream deletion is complete.

When you delete a stream, any shards in that stream are also deleted, and any tags are dissociated from the stream.

You can use the DescribeStream operation to check the state of the stream, which is returned in StreamStatus.

DeleteStream has a limit of five transactions per second per account.

See also: AWS API Documentation

Request Syntax

client.delete_stream(
    StreamName='string',
    EnforceConsumerDeletion=True|False
)
type StreamName:

string

param StreamName:

[REQUIRED]

The name of the stream to delete.

type EnforceConsumerDeletion:

boolean

param EnforceConsumerDeletion:

If this parameter is unset ( null) or if you set it to false, and the stream has registered consumers, the call to DeleteStream fails with a ResourceInUseException.

returns:

None

DescribeStreamSummary (updated) Link ¶
Changes (response)
{'StreamDescriptionSummary': {'ConsumerCount': 'integer'}}

Provides a summarized description of the specified Kinesis data stream without the shard list.

The information returned includes the stream name, Amazon Resource Name (ARN), status, record retention period, approximate creation time, monitoring, encryption details, and open shard count.

See also: AWS API Documentation

Request Syntax

client.describe_stream_summary(
    StreamName='string'
)
type StreamName:

string

param StreamName:

[REQUIRED]

The name of the stream to describe.

rtype:

dict

returns:

Response Syntax

{
    'StreamDescriptionSummary': {
        'StreamName': 'string',
        'StreamARN': 'string',
        'StreamStatus': 'CREATING'|'DELETING'|'ACTIVE'|'UPDATING',
        'RetentionPeriodHours': 123,
        'StreamCreationTimestamp': datetime(2015, 1, 1),
        'EnhancedMonitoring': [
            {
                'ShardLevelMetrics': [
                    'IncomingBytes'|'IncomingRecords'|'OutgoingBytes'|'OutgoingRecords'|'WriteProvisionedThroughputExceeded'|'ReadProvisionedThroughputExceeded'|'IteratorAgeMilliseconds'|'ALL',
                ]
            },
        ],
        'EncryptionType': 'NONE'|'KMS',
        'KeyId': 'string',
        'OpenShardCount': 123,
        'ConsumerCount': 123
    }
}

Response Structure

  • (dict) --

    • StreamDescriptionSummary (dict) --

      A StreamDescriptionSummary containing information about the stream.

      • StreamName (string) --

        The name of the stream being described.

      • StreamARN (string) --

        The Amazon Resource Name (ARN) for the stream being described.

      • StreamStatus (string) --

        The current status of the stream being described. The stream status is one of the following states:

        • CREATING - The stream is being created. Kinesis Data Streams immediately returns and sets StreamStatus to CREATING.

        • DELETING - The stream is being deleted. The specified stream is in the DELETING state until Kinesis Data Streams completes the deletion.

        • ACTIVE - The stream exists and is ready for read and write operations or deletion. You should perform read and write operations only on an ACTIVE stream.

        • UPDATING - Shards in the stream are being merged or split. Read and write operations continue to work while the stream is in the UPDATING state.

      • RetentionPeriodHours (integer) --

        The current retention period, in hours.

      • StreamCreationTimestamp (datetime) --

        The approximate time that the stream was created.

      • EnhancedMonitoring (list) --

        Represents the current enhanced monitoring settings of the stream.

        • (dict) --

          Represents enhanced metrics types.

          • ShardLevelMetrics (list) --

            List of shard-level metrics.

            The following are the valid shard-level metrics. The value " ALL" enhances every metric.

            • IncomingBytes

            • IncomingRecords

            • OutgoingBytes

            • OutgoingRecords

            • WriteProvisionedThroughputExceeded

            • ReadProvisionedThroughputExceeded

            • IteratorAgeMilliseconds

            • ALL

            For more information, see Monitoring the Amazon Kinesis Data Streams Service with Amazon CloudWatch in the Amazon Kinesis Data Streams Developer Guide.

            • (string) --

      • EncryptionType (string) --

        The encryption type used. This value is one of the following:

        • KMS

        • NONE

      • KeyId (string) --

        The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/".You can also use a master key owned by Kinesis Data Streams by specifying the alias aws/kinesis.

        • Key ARN example: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012

        • Alias ARN example: arn:aws:kms:us-east-1:123456789012:alias/MyAliasName

        • Globally unique key ID example: 12345678-1234-1234-1234-123456789012

        • Alias name example: alias/MyAliasName

        • Master key owned by Kinesis Data Streams: alias/aws/kinesis

      • OpenShardCount (integer) --

        The number of open shards in the stream.

      • ConsumerCount (integer) --

        The number of enhanced fan-out consumers registered with the stream.