Atsushi2022の日記

データエンジニアリングに関連する記事を投稿してます

Azure DP-203 勉強メモ

データウェアハウスのアーキテクチャ

Datalake → Data Warehouse → BI

  • Datalake: Azure Storage Gen2
  • Data Warehouse: Azure Synapse Analytics
  • BI: PowerBI

Azure Storage Account

コンテナの種類が色々あるけど、データエンジニアリングに主に使うのはBlobコンテナ。

Blobコンテナだと、Gen2ファイルシステムに設定できる。

Azure Synapse AnalyticsではGen2ファイルシステムを用いる。

最低保存期間

https://learn.microsoft.com/ja-jp/azure/storage/blobs/access-tiers-overview#summary-of-access-tier-options

  • ホット アクセス層:特になし
  • クール アクセス層:最低30日間は保存する必要がある。
  • アーカイブ アクセス層:最低180日間は保存する必要がある。

アクセス方法

  • SAS
  • アクセスキー

Power BIからストレージアカウントへの接続することもできる。

Storage ExplorerはAzure Storage Account用のエクスプローラーツール。

Azure Data Lake Storage Gen2 のアクセス制御リスト

https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-access-control

Azure Data Lake Storage Gen2 のクエリ アクセラレーション

https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-query-acceleration

ライフサイクル管理ルールの定義

https://learn.microsoft.com/ja-jp/azure/storage/blobs/lifecycle-management-overview#sample-rule

  • tierToCooltierToArchivedeleteでクール、アーカイブ、削除に変更するタイミングを定義する。
{
  "rules": [
    {
      "enabled": true,
      "name": "sample-rule",
      "type": "Lifecycle",
      "definition": {
        "actions": {
          "version": {
            "delete": {
              "daysAfterCreationGreaterThan": 90
            }
          },
          "baseBlob": {
            "tierToCool": {
              "daysAfterModificationGreaterThan": 30
            },
            "tierToArchive": {
              "daysAfterModificationGreaterThan": 90,
              "daysAfterLastTierChangeGreaterThan": 7
            },
            "delete": {
              "daysAfterModificationGreaterThan": 2555
            }
          }
        },
        "filters": {
          "blobTypes": [
            "blockBlob"
          ],
          "prefixMatch": [
            "sample-container/blob1"
          ]
        }
      }
    }
  ]
}

SAS

Azure Storage の冗長性

https://learn.microsoft.com/ja-jp/azure/storage/common/storage-redundancy

  • RA-GRSではなく、GRSであってもフェイルオーバー中はセカンダリリージョンの読み取りアクセスが可能
  • RA-GRSなら障害が発生していなくても常にセカンダリ リージョンの読み取りアクセスが可能

ファイル サイズにおけるベストプラクティス

https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-best-practices#file-size

  • ファイル サイズを大きくすると、パフォーマンスが向上し、コストが削減されます。
  • 一般的に、データを大きなサイズ (256 MB から 100 GB のサイズ) のファイルに整理するとパフォーマンスが向上します。
  • ファイル サイズを大きくすると、トランザクションのコストを削減することもできます。 読み取りと書き込みの操作は 4 MB 単位で課金されるため、ファイルのサイズが 4 MB であるか、数 KB であるかにかかわらず、操作に対して課金されます。

マネージドID

https://learn.microsoft.com/ja-jp/azure/active-directory/managed-identities-azure-resources/overview

  • AzureリソースでマネージドIDをイネーブルすると、リソースがAzure ADに登録され、IDが払いだされる。それがマネージドID。
  • 自動で割り当てられたり、ユーザによって割り当てられたりする。
  • マネージドIDを利用して、Azureリソースは他のリソースにアクセスできるようになる(IAM設定をした場合)
  • 例えば、Azure Synapse Analyticsの専用SQLからBlobストレージにアクセスする場合に、SASやアクセスキーを利用することもできるが、Azure Synapse AnalyticsワークスペースのマネージドIDに対してIAMで権限を付与するほうがよりセキュア。

Azure SQL

Azure SQLというくくりで、2種類のサービスが存在する。

  • Azure SQL Database
  • Azure SQL Managed Instance

Azure SQL Database

デプロイオプション

  • 単一データベース
    • フル マネージドの分離されたデータベース
  • エラスティックプール
    • CPU やメモリなどのリソースの共有セットを含む単一データベースのコレクション
  • Managed Instance

料金体系

  • vCore
    • ワークロードに応じて最適なメモリ、ストレージオプションを提供するモデル
  • DSU: データベーストランザクションユニット
    • ストレージの容量を固定し、定額料金で利用できるモデル

Azure SQL Managed Instance

スキップ

Transact-SQL

構文表記規則

https://learn.microsoft.com/ja-jp/sql/t-sql/language-elements/transact-sql-syntax-conventions-transact-sql?view=sql-server-ver16

[ ]がどういう時に使用されるのかわからなかったので調べたが、上記ドキュメントを読む限り省略可能なので、見やすさのために記載しているのか(?)

GOコマンド

Transact-SQLでは、SELECTした時点では値が返ってこない。

GOコマンドを実行するとSELECTの結果が取得できる。GOコマンドの時点までが実行される。

GOコマンドは前回のGOコマンドを実行した位置以降から有効となる。前回のGOコマンド以前に変数を定義しても、前回のGOコマンド以降の行ではその変数は使えない。

Azure Synapse Analytics

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/overview-what-is

Azure Synapse Analyticsはこのサイトにある画像を貼り付ける。

以下のサービスを統合的に提供するサービス。

  • Synapse Studio
  • Synapse SQL (サーバーレス、専用)
  • Data Factory
  • Apache Spark for Azure Synapse
  • Azure Data Lake Storage Gen2
  • Azure Synapse Data Explorer

カスタマー マネージド キーを使用した二重暗号化

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/security/workspaces-encryption#manage-the-workspace-customer-managed-key

  • ワークスペースはデフォルトでプラットフォーム マネージド キーにより暗号化されている。
  • カスタマー マネージド キーで暗号化設定することもできる。
  • まず、Azure Key Vaultでキーを生成する。
  • Synapse Analyticsワークスペース作成時に、セキュリティタブで「カスタマー マネージド キーを使用して二重暗号化」を有効にする。
  • Synapse Analyticsワークスペースが作成されている間に、Azure Key Vaultで生成したキーのアクセスポリシーを設定する Azure Synapse AnalyticsのマネージドIDに対して、"Get"、"Wrap"、"Unwrap" のアクセス許可を付与する。

Synapse SQL

このドキュメントにわかりやすくまとめられている。

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/overview-architecture

Synapse SQLでは、複数のノードにデータの計算処理を分散。

コンピューティングをストレージから切り離すことで、コンピューティングをスケーリングできる。

コンピューティングノードはすべてのユーザーデータをAzure Storage に保存し、並行クエリを実行する。

データは Azure Storage によって保存および管理されるため、ストレージの使用量が別途課金される。

Synapse SQLには以下の2種類が存在する。

  • サーバレスSQLプール
  • 専用SQLプール

サーバレスは、blobコンテナなどにファイルをおいて、それを外部テーブルとして読み出すので、データ自体は外部にある。 一方、専用SQLプールはサーバをたてて、そこにデータを置く。専用SQLプールはちゃんとユーザを作成して権限を与える必要がある。さらに、ワークロードグループやワークロードクラシファイアーなども出てくる。よくわからないので、確認要!

専用SQLプール

分散処理(ディストリビューション

クエリリクエストは、コントロールノードで受付られる。コントロールノードは複数あるコンピュートノードにリクエストを振り分けて並行処理させる。SQLプールには複数のコンピュートノードが存在する(もちろん1つのみに設定することもできる)。

一方、データは60個のディストリビューションに分割して保持される。ディストリビューションはAzure Storageに保存される。

データが分割されているのでコンピュートノードが並行処理することができる。例えば、コンピュートノードとディストリビューションを1対1で対応させるといった感じで並行処理する。

さらにこの構成により、コンピュートとストレージを別々にスケールさせることができる。

均等に分散したデータを含むディストリビューション列を選択する

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-distribute#choose-a-distribution-column-with-data-that-distributes-evenly

ディストリビューションの選択にあたっては次を参考にする。

  • 多数の一意の値を含む。
  • NULL がない、または少ししか NULL がない。
  • 日付列ではない。

テーブルの種類

SQL poolのテーブルは3種類ある。

  • ラウンドロビン:物理的に異なるストレージに順繰りにいれていく
  • ハッシュテーブル:特定の列をハッシュ値に変換して、ハッシュ値ごとにストレージをわける
  • レプリケイトテーブル:すべてのストレージにデータを入れる

デフォルトではラウンドロビンテーブルになっている。SSMSのテーブルアイコンでテーブルタイプを見分けることができる

DBCC PDW_SHOWSPACEUSED('テーブル名')で、テーブルの各レコードが保管されているノードIDを確認できる。

ラウンドロビンテーブルに対して、GROUP BY句で値ごとにグループ化しようとすると、同じ値でも別のノードに分散してしまっているため、データシャッフリングが必要となる(データをシャッフルして、同じ値を同じノードに集める)

WITH句でテーブルタイプを指定できる。

CREATE TABLE [dbo].SalesFact WITH
(
DISTRIBUTION = HASH (CustomerID) )

ツール

  • SSMS (SQL Server Management Studio)
    • PostgreSQLでいうところのpgAdminのようなツール。

パーティション

データをパーティションと呼ばれる小さなグループにしておく。

WHERE句でフィルターするときに、パーティションを使用すると読み込むデータを絞ることができ、高速化できる。

通常は日付でパーティションされる。

ディストリビューションパーティションあたり、最低でも100万行は必要。

CREATE PARTITION FUNCTION myRangePF1 (datetime2(0))  
    AS RANGE RIGHT FOR VALUES ('2022-04-01', '2022-05-01', '2022-06-01') ;  
GO  

CREATE PARTITION SCHEME myRangePS1  
    AS PARTITION myRangePF1  
    ALL TO ('PRIMARY') ;  
GO  

CREATE TABLE dbo.PartitionTable (col1 datetime2(0) PRIMARY KEY, col2 char(10))  
    ON myRangePS1 (col1) ;  
GO

https://learn.microsoft.com/ja-jp/sql/relational-databases/partitions/partitioned-tables-and-indexes?view=sql-server-ver16#partition-function

  • LEFT 範囲は、間隔値がデータベース エンジンによって左から右への昇順で並べ替えられる場合に、境界値が境界値間隔の左側に属することを指定します。 つまり、境界の最大値がパーティション内に含まれます。
  • RIGHT 範囲は、間隔値がデータベース エンジンによって左から右への昇順で並べ替えられる場合に、境界値が境界値間隔の右側に属することを指定します。 つまり、境界値が最も小さい値が各パーティションに含まれます。

パーティションの切り替え

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-partition#partition-switching

CREATE TABLE [dbo].[FactInternetSales_20000101_20010101]
    WITH    (   DISTRIBUTION = HASH([ProductKey])
            ,   CLUSTERED COLUMNSTORE INDEX
            ,   PARTITION   (   [OrderDateKey] RANGE RIGHT FOR VALUES
                                (20000101,20010101
                                )
                            )
            )
AS
SELECT  *
FROM    [dbo].[FactInternetSales_20000101]
WHERE   [OrderDateKey] >= 20000101
AND     [OrderDateKey] <  20010101;

ALTER TABLE dbo.FactInternetSales_20000101_20010101 SWITCH PARTITION 2 TO dbo.FactInternetSales PARTITION 2;

列志向

Azure SQL Databaseは行志向データベースだが、Azure Synapse AnalyticsのSQL Poolは列志向データベースになっている。

インデックス

クラスター化された列ストアインデックス

  • デフォルトでは、列ストアインデックスが提供される
  • データ型がvarchar(max)、nvarchar(max)、varbinary(max)の場合はクラスター化された列ストアインデックスは使用できない
  • テーブルに付き、1つしかインデックスは作成できない
  • インデックス作成時に特定の列を指定しない
CREATE TABLE myTable
  (  
    id int NOT NULL,  
    lastName varchar(20),  
    zipCode varchar(6)  
  )  
WITH ( CLUSTERED COLUMNSTORE INDEX );

クラスター化されたインデックス

  • クラスター化された列ストアインデックスが作成できない場合は、こちら。
  • テーブルに付き、1つしかインデックスは作成できない
  • インデックス作成時に特定の列を指定する
CREATE TABLE myTable
  (  
    id int NOT NULL,  
    lastName varchar(20),  
    zipCode varchar(6)  
  )  
WITH ( CLUSTERED INDEX (id) );

クラスターインデックス

  • クラスター化インデックスとは異なり、複数の非クラスターインデックスを作成可能
  • クラスター化インデックスとは違い、インデックス用のビューがテーブルとは別に作成される
CREATE INDEX zipCodeIndex ON myTable (zipCode);

ヒープテーブル

https://learn.microsoft.com/ja-jp/sql/relational-databases/indexes/heaps-tables-without-clustered-indexes?view=sql-server-ver16

  • キャッシュ上にテーブルを作成することで、データ書き込みと読み取りを高速化できる。
  • 本番のテーブルにロードする前のデータ変換などに利用する。
  • ヒープではクラスター化インデックスを使用できない。
  • クラスター化インデックスを作成することができる。
CREATE TABLE <テーブル名> (
  ...
) 
WITH ( HEAP )

古いバージョンの専用SQLプール(旧称 SQL DW)の暗号化

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-overview-manage-security#encryption

データマスキング

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/dynamic-data-masking-overview?view=azuresql

  • 組み込みの関数で簡単にデータをマスキングできる。
  • Eメールや電話番号などが含まれる列を自動で見つけ出してマスキングのレコメンドもしてくれる。
  • 管理者権限をもっていると、マスキングが適用されない。
  • 以下のようにSQLでもマスキングを追加できる。
  • データマスクは、見た目上わからなくなってはいるが、暗号化はされていないことに注意。
ALTER COLUMN [Phone Number] ADD MASKED WITH (FUNCTION = 'partial(5,"XXXXXXX",0)'

https://learn.microsoft.com/en-us/sql/relational-databases/security/dynamic-data-masking?view=sql-server-2017

監査

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/auditing-overview?view=azuresql#setup-auditing

  • 簡単にログアナリティクスやストレージに監査ログを貯めることができる機能

設定方法

  • 対象のAzure Synapse Analyticsの左側メニューで、Security配下のAuditingを選択する。
  • 監査機能をイネーブルする。
  • Log Analytics等を選択し、保存先を指定する。

データの検出と分類

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/data-discovery-and-classification-overview?view=azuresql

  • どのテーブルにどのような機密データが保持されているかを登録しておくことができる。
  • 「データの検出と分類」をクリックすると、自動でカラムを読み取って、センシティブな情報を保持するカラムを見つけてくれる。
  • そのうえで、分類のレコメンデーションをしてくれる。

Azure AD認証で専用SQLへアクセス

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/active-directory-authentication

  • Azure ADのアカウントで専用SQLにアクセスできる(SQL認証のアカウント/パスワードがいらない)
  • SSMSの認証画面で「Azure Active Directory - Universal with MFA」を選択して、いつもAzureにアクセスしているアカウントを入力する。
  • 事前に以下のように専用SQLでユーザーを作成し、ロールを割り当てておく必要がある。
CREATE USER [xxxx@xxxxxx.onmicrosoft.com]
FROM EXTERNAL PROVIDER
WITH DEFAULT_SCHEMA = dbo;

CREATE ROLE [NewRole]
GRANT SELECT ON SCHEMA [dbo] TO [NewRole]
EXEC sp_addrolemember N'NewRole', N'xxxx@xxxxxx.onmicrosoft.com'

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/authentication-aad-configure?view=azuresql&tabs=azure-powershell#create-contained-users-mapped-to-azure-ad-identities

行レベルセキュリティ

https://learn.microsoft.com/ja-jp/sql/relational-databases/security/row-level-security?toc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Ftoc.json&bc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Fbreadcrumb%2Ftoc.json&view=sql-server-ver16#Typical

  • 簡単に言うと、WHERE句を利用して、クエリを実行するユーザの名前が含まれる行のみを返すようにする。

列レベルセキュリティ

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/column-level-security?view=sql-server-ver16#example

  • GRANT文で特定の列のみSELECTを許可することで、列レベルのセキュリティを実現する。
GRANT SELECT ON Membership(MemberID, FirstName, LastName, Phone, Email) TO TestUser;

マネージドIDを利用した外部テーブルの作成

https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/develop-storage-files-storage-access-control?tabs=user-identity#access-a-data-source-using-credentials

  • SASではなく、マネージドIDを利用して外部テーブルを作成することができる。
  • 下記のIDENTITY = 'Managed Identity'というところで、マネージドID認証を指定している。
  • Azure Synapse Analyticsワークスペースには、事前にストレージへのアクセス権限をIAMで付与しておく必要がある(必要であれば、ストレージのACLの権限も。)
CREATE DATABASE SCOPED CREDENTIAL SynapseWorkspaceIdentity
WITH IDENTITY = 'Managed Identity'


CREATE EXTERNAL DATA SOURCE log_data_managed
WITH (    LOCATION   = 'https://<storage_account>.dfs.core.windows.net/<container>/<path>'
CREDENTIAL = SynapseWorkspaceIdentity,
TYPE = HADOOP
)

CREATE EXTERNAL FILE FORMAT TextFileFormatManaged WITH (
      FORMAT_TYPE = DELIMETEDTEXT,
      FORMAT_OPTIONS (
        FIELD_TERMINATOR = ',',
        FIRST_ROW=2
      )
)

CREATE EXTERNAL TABLE logdatamanaged
(
  ...
)
WITH (
  LOCATION = 'data/Log.csv',
  DATA_SOURCE = log_data_managed,
  FILE_FORMAT = TextFileFormatManaged
)

動的管理ビューを使用したワークロードの確認

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-manage-monitor

  • 動的管理ビュー (DMV) を使用して、専用 SQL プールでのクエリ実行といったワークロードを確認できる。

結果セットのキャッシュ

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/performance-tuning-result-set-caching

  • 結果セットのキャッシュをイネーブルすることができる。
  • 自動的にクエリ結果をデータベース上にキャッシュする。
  • 高速化できる。
  • 結果セットのキャッシュが使用されない場合には、48時間毎に消去される。
  • または、結果セットのキャッシュがいっぱいになったら削除される。
-- キャッシュがイネーブルされているデータベースを確認する
-- キャッシュ ヒットの場合は 1、キャッシュ ミスの場合は 0、結果セットのキャッシュが使用されなかった理由については負の値が返される。
SELECT name, is_result_set_caching_on FROM sys.databases

-- データベースのクエリ ストアを有効にする
-- masterデータベース上で実行する。
-- https://learn.microsoft.com/ja-jp/sql/t-sql/statements/alter-database-transact-sql-set-options?toc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Ftoc.json&bc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Fbreadcrumb%2Ftoc.json&view=azure-sqldw-latest&preserve-view=true#enable-query-store-for-a-database
ALTER DATABASE [database_name]
SET QUERY_STORE = ON;

-- データベースに対して結果セットのキャッシュを有効にする
ALTER DATABASE [database_name]
SET RESULT_SET_CACHING ON;

ワークロード管理

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-workload-classification

復元ポイントからの復元

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/backup-and-restore#restoring-from-restore-points

復元ポイントのリテンション期間
復元ポイントのリテンション期間の詳細は次のとおりです。

- 専用 SQL プールは、7 日間の保有期間に達したときと、復元ポイントの合計数が少なくとも 42 個ある場合 (ユーザー定義と自動の両方を含む) に、復元ポイントを削除します。
- 専用 SQL プールが一時停止しているときはスナップショットは取得されません。
- 復元ポイントの経過時間は、復元ポイントが取得された時点 (SQL プールが一時停止された時点を含む) からのカレンダーの絶対日数によって測定されます。
- 任意の時点で、復元ポイントが 7 日間の保有期間に達していない限り、専用 SQL プールは最大 42 個のユーザー定義の復元ポイントまたは 42 個の自動復元ポイントを格納できることが保証されています
- スナップショットが取得され、専用 SQL プールが 7 日間を超えて一時停止されてから再開した場合、復元ポイントの数が合計で 42 になるまで (ユーザー定義と自動の両方を含む)、復元ポイントが維持されます。

IDENTITY を使用して、Azure Synapse Analytics の専用 SQL プールで代理キーを作成する

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-identity

  • テーブルの代理キーは、各行の一意の識別子を持つ列
  • IDENTITY プロパティを使うと、この目的を簡単かつ効果的に達成でき、読み込みのパフォーマンスが影響を受けることもありません。
CREATE TABLE dbo.T1
(    C1 INT IDENTITY(1,1) NOT NULL
,    C2 INT NULL
)
WITH
(   DISTRIBUTION = HASH(C2)
,   CLUSTERED COLUMNSTORE INDEX
)
;

Azure AD の ID で専用SQLに接続する。

Azure AD ID にマップされる包含ユーザーを作成する

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/authentication-aad-configure?tabs=azure-powershell&view=azuresql#create-contained-users-mapped-to-azure-ad-identities

  • SQL Database で Azure Active Directory 認証を使用するには、Azure AD ID に基づく包含データベース ユーザーを使用する必要があります。
  • 包含データベース ユーザーは、master データベース内にログインを持たず、データベースに関連付けられている Azure AD の ID にマップされます。
  • Azure AD の ID には、個々のユーザー アカウントにもグループ アカウントにもなります。
CREATE USER [appName] FROM EXTERNAL PROVIDER;

Azure AD の ID で SSMS または SSDT を利用して接続する

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/authentication-aad-configure?tabs=azure-powershell&view=azuresql#create-contained-users-mapped-to-azure-ad-identities

  • Azure AD の ID を使用して SQL Database に接続する。

キャッシュ ヒットと使用率

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-how-to-monitor-cache#cache-hit-and-used-percentage

  • キャッシュヒット率とキャッシュ使用率の関係についてまとめられている。

サーバレスSQLプール

OPENROWSET を使用して JSON ファイルを読み取る

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/query-json-files

  • OPENROWSETで行区切りの JSON ファイルを読み取り、個別の行として返す。
select top 10 *
from openrowset(
        bulk 'https://pandemicdatalake.blob.core.windows.net/public/curated/covid-19/ecdc_cases/latest/ecdc_cases.jsonl',
        format = 'csv',
        fieldterminator ='0x0b',
        fieldquote = '0x0b'
    ) with (doc nvarchar(max)) as rows

https://learn.microsoft.com/ja-jp/sql/relational-databases/json/import-json-documents-into-sql-server?view=sql-server-ver16

  • OPENROWSET では、ファイルから 1 つのテキスト値が読み取られます。
  • OPENROWSET では値が BulkColumn として返され、OPENJSON 関数に BulkColumn が渡されます。
  • OPENJSON では、BulkColumn 配列内の JSON オブジェクトの配列が反復処理される。

外部テーブルの作成

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/develop-tables-external-tables?tabs=hadoop#arguments-create-external-table

CREATE EXTERNAL TABLE { database_name.schema_name.table_name | schema_name.table_name | table_name }
    ( <column_definition> [ ,...n ] )  
    WITH (
        LOCATION = 'folder_or_filepath',  
        DATA_SOURCE = external_data_source_name,  
        FILE_FORMAT = external_file_format_name
        [, TABLE_OPTIONS = N'{"READ_OPTIONS":["ALLOW_INCONSISTENT_READS"]}' ]
        [, <reject_options> [ ,...n ] ] 
    )
[;] 

<column_definition> ::=
column_name <data_type>
    [ COLLATE collation_name ]

<reject_options> ::=  
{  
    | REJECT_TYPE = value,  
    | REJECT_VALUE = reject_value,  
    | REJECT_SAMPLE_VALUE = reject_sample_value,
    | REJECTED_ROW_LOCATION = '/REJECT_Directory'
}
  • LOCATION = 'folder_or_filepath'
    • Azure Blob Storage にある実際のデータのフォルダーまたはファイル パスとファイル名を指定
    • Hadoop 外部テーブルとは異なり、ネイティブ外部テーブルでは、パスの最後に /** を指定しない限り、サブフォルダーが返されません。

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/query-folders-multiple-csv-files

  • フォルダー内のすべてのファイルを読み取ったり、フォルダーを再帰的に読み取ったり、複数のフォルダーからファイルを読み取るといったこともできる。

https://learn.microsoft.com/ja-jp/sql/t-sql/statements/create-external-table-transact-sql?view=sql-server-ver16&tabs=dedicated#limitations-and-restrictions

  • 外部テーブルに対しては、以下のデータ定義言語 (DDL) ステートメントのみを使用できます。
    • CREATE TABLE および DROP TABLE
    • CREATE STATISTICS および DROP STATISTICS
    • CREATE VIEW および DROP VIEW
  • 他のDDL(ALTER、TRUNCATE)はサポートされていない。
  • データ操作言語 (DML) の削除、挿入、更新の操作もサポートされていない。

Azure Synapse Sparkプール の外部テーブル定義を同期する

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/develop-storage-files-spark-tables

  • Parquet または CSV に基づく、Azure Storage に配置された各 Spark 外部テーブルに対して、サーバーレス SQLプール データベースに外部テーブルが作成されます。
  • そのため、Spark プールをシャットダウンしても、サーバーレス SQL プールから Spark 外部テーブルに対するクエリを引き続き実行できます。

PolyBase

https://learn.microsoft.com/ja-jp/sql/relational-databases/polybase/polybase-guide?view=sql-server-ver16

  • SQLサーバーからBlobストレージ等にある外部データを直接照会できる。

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits#loads

  • Polybase では、1 MB 未満の行を読み込みます。1行が 1MB を超えることはできない。

Pivotによる縦持ちから横持ちへの変換

https://learn.microsoft.com/ja-jp/sql/t-sql/queries/from-using-pivot-and-unpivot?view=sql-server-ver16

  • PIVOT では、式内の 1 つの列にある複数の一意の値を出力内の複数の列に変えることにより、テーブル値式が行列変換されます。
  • UNPIVOT 関係演算子の機能は PIVOT 関係演算子の逆で、テーブル値式の複数の列を列値に行列変換します。
  • PIVOT の構文は、SELECT...CASE ステートメントを複雑に組み合わせて同じ操作を指定する場合よりも単純で読みやすくなります。

https://style.potepan.com/articles/25072.html

透過的な暗号化

https://learn.microsoft.com/ja-jp/azure/azure-sql/database/transparent-data-encryption-tde-overview?tabs=azure-portal&view=azuresql

  • データベース、バックアップ、ログ ファイルを暗号化し、バックアップやログからデータに直接アクセスできないようにする。
  • デフォルトで有効

Always Encrypted

https://learn.microsoft.com/ja-jp/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver15

  • クレジット カード番号や国の識別番号などの機密データを保護するために設計された機能
  • Always Encryptedを使用すると、クライアントはクライアント アプリケーション内の機密データを暗号化し、暗号化キーをデータベース エンジンに公開することはありません。
  • これにより、データを所有し、それを表示できるユーザーと、データを管理するがアクセス権を持たない必要があるユーザー (オンプレミスのデータベース管理者、クラウド データベースオペレーター、またはその他の高い特権を持つ未承認のユーザー) を分離できます。
  • その結果、Always Encryptedにより、顧客は自信を持って機密データをクラウドに格納し、悪意のある内部関係者によるデータ盗難の可能性を減らすことができます。

決定論的暗号化とランダム暗号化

https://learn.microsoft.com/ja-jp/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver16#selecting--deterministic-or-randomized-encryption

  • 決定論的暗号化

    • 指定されたプレーンテキスト値に対して常に同じ暗号化値が生成される。
    • 暗号化された列で結合、グループ化、インデックス作成を行うことができる。
  • 暗号化をランダム化

    • ランダム化された暗号化は安全性が上がる。
    • 暗号化された列に対して検索、グループ化、インデックス作成、結合ができない。

Azure Synapse AnalyticsワークスペースIPアドレス制限

https://learn.microsoft.com/ja-jp/azure/synapse-analytics/security/synapse-workspace-ip-firewall

Data Factory

  • データ変換を行うサービス

構成

メインとなるコンポーネントは以下の通り。詳細は次のリンクを参照のこと。

https://learn.microsoft.com/ja-jp/azure/data-factory/introduction#top-level-concepts

  • Linked Service:データソースからデータを取り込む(Ingest)ため、あるいはデータデスティネーションへデータをロードするための接続文字情報を保有する。
  • Dataset:データ構造(Data Structure)を保持する。データそのものではなく、 データを参照する名前付きのビュー。
  • Activity:変換ロジック
  • Pipeline:複数のActivityを束ねる
  • Integration Runtime:データ抽出、変換、ロードを行うコンピューティングインフラストラクチャー
  • パイプライン実行:パイプラインを実行するインスタンス

Data Factory内で利用できるサービス

  • Data Flow
  • Power Query

統合ランライム

データ抽出、変換、ロードの処理は、コンピューティングはAzure統合ランタイムで行う。

統合ランタイムはまったく別の用途にも用いられる。VMにあるデータをSynapse Analyticsに連携する場合は、VMセルフホステッド統合ランタイムをインストールする必要がある。でないと、VMにあるデータを連携できない。

種類

  • Azure Integration Runtime(サーバーレス)
  • Azure マネージド VNET 統合ランタイム
  • セルフホステッド統合ランタイム

セルフホステッド統合ランタイムのセットアップ

まずデータファクトリの「管理」画面で、Intergration rutimesを選択する。

Newをクリックし、セルフホステッド統合ランタイムをセットアップする。

すると、認証キーが表示されるので、VMにインストールしたセルフホステッド統合ランタイムに認証キーを入力する。

これでVMがデータファクトリに登録される。

データファクトリ側では、新しいデータセットファイルファイルシステムを選択する。

パイプラインの作成と実行

コピーアクティビティのケースでは、「データのコピー」アクティビティに加えて、コピー元とコピー先のリンクされたサービス(計2個)とデータセット(計2個)を作成する。

リンクされたサービスとして、例えばストレージアカウントの接続文字列とアカウントキーを登録する。データセットとして、例えばリンクされたサービス、ストレージのファイルパス、区切り記号、エスケープ文字などを登録する。

上記リソースを作成したら、実行前に「発行」することで正式に登録されて実行可能になる。

今すぐにパイプラインを実行する場合は、「トリガーの追加」から「今すぐトリガー」をクリックする。

パイプラインの実行状況は「Monitor」で確認できる。Monitorでは詳細な実行履歴、エラーメッセージを確認できる。

コピーアクティビティ

  • Azure SQL databaseなどのDBをソースとして選択した場合、テーブルそのものを指定するのに加えて、クエリを記述して読み込むことができる。

コピーアクティビティにおけるファイル階層構造の保持

https://learn.microsoft.com/ja-jp/azure/data-factory/connector-azure-data-lake-storage?tabs=data-factory

  • Azure Data Lake Storageをシンクとして指定する場合に、copyBehaviorで、PreserveHierarchy、FlattenHierarchy、MergeFilesが選べる。
    • PreserveHierarchy (既定値):ターゲット フォルダー内でファイル階層を保持します。 ソース フォルダーへのソース ファイルの相対パスはターゲット フォルダーへのターゲット ファイルの相対パスと同じになります。
    • FlattenHierarchy:ソース フォルダーのすべてのファイルをターゲット フォルダーの第一レベルに配置します。 ターゲット ファイルは、自動生成された名前になります。
    • MergeFiles:ソース フォルダーのすべてのファイルを 1 つのファイルにマージします。 ファイル名を指定した場合、マージされたファイル名は指定した名前になります。 それ以外は自動生成されたファイル名になります。

Get Metadataアクティビティ

BlobストレージやAzure SQL DatabaseのメタデータをData flow内で取得できる。どういったメタデータが取得できるかはリソースによって異なる。詳細は以下を参照。Get Metadataアクティビティでストレージ内のファイル名をすべて取得し、For Eachアクティビティで各ファイルに対して処理を行うといったことができる。

https://learn.microsoft.com/en-us/azure/data-factory/control-flow-get-metadata-activity#supported-connectors

データフローアクティビティ

  • コピーアクティビティでクエリを記述することでもデータ変換は可能だが、Data Flowを使用すると、クエリをハードコードせずに済む。
  • コードを書かずにデータ変換できる。
  • Apache Sparkクラスターが作成され、クラスター上で実行される。Apache Sparkクラスターの起動に時間がかかる。
  • デバッグモードで変換結果を確認できる。
  • デバッグモードでも時間単位で課金される。
  • 数分しか使わなくても1時間分課金される。
  • 単価はvCore当たり。最低でもvCoreが8個使用される。

代表的な変換

すべての「変換」が以下にリストアップされている。

https://learn.microsoft.com/ja-jp/azure/data-factory/data-flow-transformation-overview

スキーマドリフト

"スキーマの誤差"のこと。コピーアクティビティのソースやシンクで、スキーマドリフトを許可することで、仮に列が追加や削除された場合であってもエラーにならずに誤差として扱うことができる。

マッピングAuto mappingがオンだと誤差の列もシンクに書き込まれる。

誤差列を参照するには派生列で列名をつける必要がある。Columnを$$、Expressionを$$と設定することで、入力された列名をそのまま設定することができる。

https://learn.microsoft.com/ja-jp/azure/data-factory/concepts-data-flow-schema-drift#map-drifted-columns-quick-action

データファクトリーにおけるGitの利用

Azure DevOpsのアカウントを作ってプロジェクトおよびリポジトリを作成し、データファクトリでManageGit ConfigurationでAzure DevOpsのGitリポジトリを設定することで、データファクトリで作成したパイプラインなどを保存できる。

https://learn.microsoft.com/ja-jp/azure/data-factory/source-control#author-with-azure-repos-git-integration

Azure DevOpsのAzure Pipelines Releasesを使用して、本番用に作成しておいたデータファクトリ環境に対して、デプロイを自動化できる。

https://learn.microsoft.com/ja-jp/azure/data-factory/continuous-integration-delivery-automate-azure-pipelines

料金

https://azure.microsoft.com/ja-jp/pricing/details/data-factory/data-pipeline/

  • アクティビティ実行回数とその実行時間に基づいて課金される
  • 統合ランタイムは3種類の構成があり、それぞれ料金が異なる
  • 別途、Data Flowも課金される。 Data Flowクラスターの実行時間とデバッグ時間に対して課金される。
  • 「Monitor」のパイプライン実行履歴で、どれくらい消費しているかがわかる。

増分コピー

https://learn.microsoft.com/ja-jp/azure/data-factory/tutorial-incremental-copy-lastmodified-copy-data-tool

  • タンブリングウィンドウを使用することで、定期的に過去のデータを取り出すことができる。
  • タンブリングウィンドウでは、開始日時はUTCとみなされるため、タイムゾーンの指定は不要。
  • ファイル読み込み動作の指定(File loading behavior)で、Incremental load: LastModifiedDateを選択する。

カスタマー マネージド キーを使用した 二重暗号化

https://learn.microsoft.com/ja-jp/azure/data-factory/enable-customer-managed-key

  • デフォルトでは、ランダムに生成されてデータ ファクトリに一意に割り当てられる Microsoft 管理のキーで暗号化される。
  • さらに、カスタマーマネージドキーを設定して二重に暗号化できる。
  • Azure Key Vaultでキーを生成し、データファクトリに対して"取得"、"キーを折り返す"、"キーの折り返しを解除"のアクセス許可する
  • データファクトリのManageからCustomer managed keyを選択し、カスタマーマネージドキーのURIを設定する。

Azure Monitorでの監視

https://learn.microsoft.com/ja-jp/azure/data-factory/monitor-using-azure-monitor

  • Data Factory では、パイプライン実行データを 45 日間だけ格納します。 データをより長期間保持する場合は、Azure Monitor を使用してください。
  • データファクトリで監視できるメトリックには次のようなものがある(PipelineFailedRunsなど) https://learn.microsoft.com/ja-jp/azure/data-factory/monitor-metrics-alerts

パイプラインに注釈をつける

https://learn.microsoft.com/ja-jp/azure/data-factory/concepts-annotations-user-properties?source=recommendations

  • 注釈は、パイプライン、データセット、リンク サービス、トリガーに割り当てることができる静的な値
  • [プロパティ] アイコンで [+ 新規] ボタンをクリックし、注釈に適切な名前を付けます。
  • [監視] タブに移動すると、[パイプラインの実行] でこの注釈をフィルター処理できます。
  • アクティビティ レベルで動的な値を監視することもできる。

Azure 統合ランタイムの作成

子パイプラインでのエラー

  • 親パイプラインから子パイプラインを呼び出し、子パイプラインがエラーとなった場合には、親パイプラインもエラーとして実行データに記録される。

オンプレとの連携

  • オンプレからデータを取り込むにはLinked serviceを作成する。この際、パイプラインやアクティビティは使用しない。

Azure Data Factory と Azure Synapse Analytics でのバイナリ

https://learn.microsoft.com/ja-jp/azure/data-factory/format-binary

  • バイナリ データセットは、Copy アクティビティ、GetMetadata アクティビティ、または Delete アクティビティで使用できます。
  • バイナリ データセットを使用する場合、サービスではファイルの内容は解析されず、そのまま処理されます。そのため処理が早い。

Azure Event Hubs

ストーリーミング送受信プラットフォーム。Apache Kafkaみたいな感じ。

名前空間を作成して、そこにEvent Hubを作成する。エンドポイントURLが払いだされるので、そこにデータを送信する。

いったん受信したイベントは設定した期限が経過されるまで削除されない。

チェックポイントを使用することで、一度読み出したイベントを再度読みださないようにできる。

複数のパーティションで並列して読み書きすることで、一度に処理できる量を増やしている。

Capture機能によるBlobストレージへの書き込み

Azure Event Hubs Capture を使用すると、Azure Stream Analyticsを使用しなくても、Event Hubs のストリーミング データをAzure Blob StorageまたはAzure Data Lake Storage Gen1Azure Data Lake Storage Gen2に自動的に配信できる。

https://learn.microsoft.com/ja-jp/azure/event-hubs/event-hubs-capture-enable-through-portal

イベント ハブ にパーティションを動的に追加する

https://learn.microsoft.com/ja-jp/azure/event-hubs/dynamically-add-partitions

  • パーティションの数は、イベント ハブの作成時に指定できます。
  • イベント ハブを作成した後に、パーティションを追加することもできる。
  • パーティションを動的に追加する方法は、Event Hubs の premium および専用レベルでのみ利用できます。

Azure Stream Analytics

ストリーミングデータに対して、リアルタイムにクエリを実行して出力できる。

読み取り開始時間をNowにすると、現在時刻後に受信したデータのみ取り出す。

読み取り開始時刻をカスタムに設定すると、過去のイベントも読み出しできる。

参照データを使用する

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-use-reference-data

  • ストリーミングデータに参照データをJOINできる。
  • 参照データは、Azure Blob Storage および Azure SQL Databaseに配置する。
  • 参照データ入力を定義する方法については下記を参照する。

https://learn.microsoft.com/ja-jp/azure/stream-analytics/sql-reference-data

ウィンドウ関数

タンブリング ウィンドウ

一定時間ごとに受信したイベントをグループ化する。

各ウィンドウ時間は重なりあわない。

例)10秒ごとに区切って、発生したイベントを知りたい。

SELECT System.Timestamp() as WindowEndTime, TimeZone, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY TimeZone, TumblingWindow(second,10)

ホッピング ウィンドウ

一定時間ごとに受信したイベントをグループ化する。

各ウィンドウ時間は重なり合う。

例)過去10秒間に発生したイベント数を5秒間隔に知りたい。

SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY Topic, HoppingWindow(second,10,5)

スライディング ウィンドウ

イベントを受信したタイミングで一定時間さかのぼってグループ化する。

例)10秒間に3回以上イベントが発生したらアラートしたい。

SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY Topic, SlidingWindow(second,10)
HAVING COUNT(*) >=3

セッション ウィンドウ

最初のイベントが発生したときに開始されます。 最後にイベントが取り込まれてから指定されたタイムアウト期間内に別のイベントが発生した場合、ウィンドウはその新しいイベントを含むように拡張されます。 タイムアウト期間内にどのイベントも発生しなかった場合、ウィンドウはタイムアウト時に閉じられます。指定されたタイムアウト期間内にイベントが発生し続けた場合、セッション ウィンドウは最大期間に達するまで拡張し続けます。

例)5秒以上間隔が開いていないイベントの数を知りたい。

SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY Topic, SessionWindow(second,5,10)

スナップショット ウィンドウ

まったく同じ時間(病態ン位)に受信したイベントをグループ化する。

例)まったく同じタイミングで発生したイベントを知りたい。

SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY Topic, System.Timestamp()

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-window-functions

OVER句

https://learn.microsoft.com/ja-jp/sql/t-sql/functions/lag-transact-sql?source=recommendations&view=sql-server-ver16

LAG/LAST句

https://learn.microsoft.com/ja-jp/stream-analytics-query/lag-azure-stream-analytics

https://learn.microsoft.com/ja-jp/stream-analytics-query/last-azure-stream-analytics

  • LAG 分析演算子を使用すると、特定の制約内でイベント ストリーム内の "前の" イベントを参照できる。
LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])

例:以前の null 以外のセンサーの読み取り値を見つける

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input

UDF

JavaScriptでユーザー定義関数を作成して、クエリで利用することができる。

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-javascript-user-defined-functions

イベントの間隔を検出する

  • イベントの間隔を検出するためのクエリ

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#detect-the-duration-between-events

  • LAST関数

https://learn.microsoft.com/ja-jp/stream-analytics-query/last-azure-stream-analytics?toc=https%3A%2F%2Flearn.microsoft.com%2Fja-jp%2Fazure%2Fstream-analytics%2Ftoc.json&bc=https%3A%2F%2Flearn.microsoft.com%2Fja-jp%2Fazure%2Fbread%2Ftoc.json

Azure Stream Analyticsのメトリック

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-job-metrics

  • 上記のメトリックを監視できる。
  • 特に以下のメトリックが重要。
  • バックログされた入力イベント
    • バックログされた入力イベントの数。 このメトリックの 0 以外の値は、ジョブが受信イベントの数に追いつかないことを意味します。 この値がゆっくり増加する場合や一貫して 0 以外である場合は、ジョブをスケールアウトする必要があります。 詳細については、「ストリーミング ユニットの理解と調整」を参照してください。
  • データ変換エラー
    • 想定の出力スキーマに変換できなかった出力イベントの数。 このシナリオに発生したイベントを削除するには、エラー ポリシーを Drop に変更します。
  • 初期入力イベント
    • アプリケーション タイム スタンプが受信時間より 5 分以上早いイベント。
  • 遅延入力イベント
    • 構成済みの到着遅延許容期間より後に到着したイベント。 詳細については Azure Stream Analytics のイベントの順序に関する考慮事項を確認してください。
  • 順不同のイベント
    • イベント順序ポリシーに基づいて、削除された、または調整されたタイムスタンプが付与された、順不同で受信したイベントの数。 このメトリックは、順不同の許容範囲ウィンドウ 設定の構成によって影響を受ける可能性があります。

ストリーミングユニット

  • ストリーミングユニットの使用率が100%に達すると、ジョブが失敗し始める
  • 常にストリーミングユニットの使用率を監視する必要がある

ストリーミングユニットに関するベストプラクティス

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-streaming-unit-consumption

  • PARTITION BY を使用しないクエリでは、最低6つのストリーミングユニットを設定する。
  • 必要以上のストリーミングユニットを割り当てる必要がある。

イベント順序ポリシーの作成

https://learn.microsoft.com/ja-jp/azure/stream-analytics/event-ordering

  • イベント/アプリケーション時間は、イベント ペイロードに含まれるタイムスタンプです (イベントが生成された時間)
  • 到着時刻は、イベントが入力ソース (イベント ハブ/IoT ハブ/Blob ストレージ) に到達した時間のタイムスタンプです。
  • 既定では、Stream Analytics は到着時間でイベントを処理しますが、クエリで TIMESTAMP BY 節を使用することによって、イベント時間でイベントを処理することもできます。
  • 遅延着信と順不同のポリシーは、イベント時間でイベントを処理する場合にのみ適用できます
  • 遅延到着ポリシー
    • さまざまな理由によりイベントが遅れて到着することがあります。 たとえば、40 秒遅れて到着したイベントのイベント時間が 00:10:00 である場合、到着時間は 00:10:40 になります。 遅延到着ポリシーを 15 秒に設定した場合、15 秒よりも遅く到着するイベントは、削除されるか (Stream Analytics によっては処理されない)、そのイベント時間が調整されます。 上記の例では、イベントは 40 秒遅れて (ポリシー設定よりも遅れて) 到着したため、イベント時間は遅延到着ポリシーの最大値に合わせて 00:10:25 (到着時間 - 遅延到着ポリシーの値) に調整されます。 既定の遅延到着ポリシーは 5 秒です。
  • 順不同ポリシー
    • イベントは、順不同で到着する場合もあります。 イベント時間を遅延到着ポリシーに基づいて調整した後、順不同のイベントを自動的に削除または調整することもできます。 このポリシーを 8 秒に設定した場合、順不同であるものの、8 秒以内到着したイベントは、イベント時間で並べ替えられます。 設定した時間を超えたイベントは、削除されるか、順不同ポリシーの最大値に調整されます。 既定の順不同ポリシーは 0 秒です。

Azure Stream Analytics での時間の処理について

https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-time-handling

  • 基準値
    • イベントがどのポイントまでストリーミング プロセッサで受信されているかを示すイベント時間マーカーです。
  • 到着遅延イベント
    • 到着遅延許容期間の定義により、Azure Stream Analytics では、受信イベントごとにイベント時間が到着時間と比較されます。 イベント時間が許容期間外の場合は、イベントを削除するようにシステムを構成するか、イベントの時間を許容期間内になるように調整することができます。基準値が生成されると、サービスは基準値よりも低いイベント時間でイベントを受信する可能性があります。 これらのイベントを破棄するか、イベントの時間を基準値に調整するようにサービスを構成することができます。
  • 早期到着イベント
    • 到着遅延許容値ウィンドウの反対のような、早期到着ウィンドウと呼ばれるもう 1 つの概念にお気づきかもしれません。 このウィンドウは 5 分に固定されており、到着遅延許容値ウィンドウとは異なる目的を果たします。Azure Stream Analytics は完全な結果を保証しているため、ジョブの最初の出力時刻 (入力時刻ではなく) として、ジョブの開始時刻を指定するだけです。 ジョブの開始時刻は、完全なウィンドウ (ウィンドウの途中からだけでなく) が処理されるようにするために必要です。

ジョブの停止を監視

  • Runtime Errorメトリックでは監視できない。Runtime Errorメトリックは、ジョブがデータを受信できるけれどクエリの処理がエラーになっている場合にのみ適用される。
  • All Administrative operationsで監視できる。All Administrative operationの失敗ステータスを監視することにより、ジョブが予期せず停止した場合にアラートが発行される。

Azure IoT Hub

Azure IoT HubにIoTデバイスを登録すると、接続文字列が生成されるので、それをデバイス側で設定する。

バイスがストリームデータをIoT Hubに送ってくるので、Azure Stream Analyticsを使ってクエリを実行してデータを取り出す。

Scala

インストール

以下からダウンロードして、インストールする。

https://docs.scala-lang.org/getting-started/index.html

以下のコマンドでscalaの対話モードに入れる。

> scala3
Welcome to Scala 3.2.2 (19, Java Java HotSpot(TM) 64-Bit Server VM).
Type in expressions for evaluation. Or try :help.

scala> 3
val res0: Int = 3

scala> 4
val res1: Int = 4

scala> res0 + res1
val res2: Int = 7

var と val

varはmutableな変数。

valはimutableな変数(Javaでいうと、const付きの変数)。

varはvariableの略で、valはvalueの略。

valに再代入しようとすると、reassignment to valというエラーになる。

val

構文

  • if 構文
var i = 8

if (i < 10)
  {
    println("i is less than 10")
  }
else
  {
    println("i is more than and equal to 10")
  }
  • for 構文
for (i<-1 to 10)
  println("the value is " + i)
  • while 構文
var i = 0
while (i < 10) {
  println("the value is " + i)
  i += 1
}
  • case 構文
val i = 29
i match{
  case i if i<30 => println("i is less than 30")
  case i if i>=30 => println("i is more than and equal to 30")
}
  • 関数
def add(x:Int, y:Int): Int = {
  return x + y
}

val x = 2
val y = 1
println("x + y is " + add(x, y))
  • リスト
var numbers:List[Int] = List(0, 1, 2, 3, 4)

println("The number is " + numbers.head) // The number is 0
numbers.foreach{println} // 1 2 3 4
println("The number is " + numbers(2)) // The number is 1

var names:List[String] = List("Yamada", "Nishida", "Hanada")
names.foreach(println) // Yamada Nishida Hanada
println("The index is " + names.indexOf("Hanada")) // The index is 2

Azure Synapse Spark Pool

Apache Sparkは分散処理フレームワーク

Azure Synapse AnalyticsのSpark poolでSparkクラスターを動かすことができる。

DriverノードがSparkジョブを受付け、それをExecutorノードに振り分ける。

料金

仮想コア当たりの分単位課金。

Sparkプールはサーバレスなので、Sparkプールを作成しただけでは料金が発生しない。

Sparkジョブをサブミットしてはじめてノードが作成されて課金される。

最低ノード数量は3個で、1つはDriverノードになる。

Sparkでは、実行されるべきタイミングまでコードを実行しない。

開発

Azure Synapse AnalyticsでApache Sparkプールを作成する。

次に、Azure Synapse AnalyticsのSynapse Studioを開き、Developでノートブックを作成する。

ノートブックに作成済みのSpark poolを割り当てる。

言語でSpark (Scala)を選択する。

RDD (Resilient Deistributed Dateset)

次のようにsc.parallelize(data)とすることで、並列分散処理のため、データを各ノードに分散して配置できる。

val data = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

Spark データセット

並列で変換処理ができる。

変換処理により、新しいデータセットが作られる。

SparkデータセットScalaJavaで処理できる。

val data = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

val ds

Spark データフレーム

Sparkデータフレームは、Sparkデータセットに列名をつけたもの。

外部ファイルからデータフレームを作成できる。

SparkデータフレームはScalaJavaPython、Rで処理できる。

val data = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

val df = rdd.toDF() // データフレームに変換

RDD、データフレーム、データセットの比較についてはこちらを参照する。

https://yubessy.hatenablog.com/entry/2016/12/11/095915

列のソート

from pyspark.sql.functions import desc
from pyspark.sql.functions import col

sortted_df = df.sort(col("列名").desc())
display(sorted_df)

データのロード

Storege Gen2に配置されたLog.csvファイルをSparkデータフレームとしてロードする。

from pyspark.sql import SparkSession
from pyspark.sql.types import *

account_name = "datalake2000"
container_name = "data"
relative_path = "raw"
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)

spark.conf.set("fs.azure.account.auth.type.%s.dfs.core.windows.net" %account_name, "SharedKey")
spark.conf.set("fs.azure.account.key.%s.dfs.core.windows.net" %account_name ,"V0bi0fr1nxs3Ox4fbPUGNDtE5XlGqvYT9tJJt0hkYS2ncXmiJtcW5DO2OLzffWKLQ410oITK3Ra7xN9Qjn1hhA==")

df1 = spark.read.option('header', 'true') \
                .option('delimiter', ',') \
                .csv(adls_path + '/Log.csv')

display(df1)

SQLの使用

Sparkデータフレームをもとにビューを作成して、それに対してSQLクエリを実行できる。

df1.createOrReplaceTempView("logdata") # データフレームからビューを作成

sql_1=spark.sql("SELECT Operationname, count(Operationname) FROM logdata GROUP BY Operationname") # ビューに対してSELECT分をクエリ
sql_1.show()

# または

df1.createOrReplaceTempView("logdata") # データフレームからビューを作成

%%sql  # Jupyterノートブックのマジックコマンド。このようにマジックコマンドを指定することで、次のSQLクエリをSQL言語で実行できる。
SELECT Operationname, count(Operationname) FROM logdata GROUP BY Operationname

SparkプールからSQL専用プールへのロード

事前に専用SQLプールにテーブルが作成されているとエラーになる。

%%Spark # SparkというマジックコマンドでScala言語を使用できる

import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

# データスキーマの設定
val dataSchema = StructType(Array(
    StructField("Id", IntegerType, true),
    StructField("Correlationid", StringType, true),
    StructField("Operationname", StringType, true),
    StructField("Status", StringType, true),
    StructField("Eventcategory", StringType, true),
    StructField("Level", StringType, true),
    StructField("Time", TimestampType, true),
    StructField("Subscription", StringType, true),
    StructField("Eventinitiatedby", StringType, true),
    StructField("Resourcetype", StringType, true),
    StructField("Resourcegroup", StringType, true)))


val df = spark.read.format("csv").option("header","true").schema(dataSchema).load("abfss://data@datalake2000.dfs.core.windows.net/raw/Log.csv") # Synapse Analytics内のDatalake Gen2を使用しているのでアクセスキーが不要

df.printSchema()

# データフレームを専用SQLプールにテーブルとしてロードする
df.write.sqlanalytics("<DBName>.<Schema>.<TableName>", <TableType>)

Sparkテーブルの作成

Sparkプールのメタストア上に、Sparkテーブルを作成することができる。

分析用の一時的なテーブルとして使用すること。Sparkプールが停止すると消えてしまう。

利点として、サーバレスSQLプールでSparkテーブルを共有して利用できる。

注意点:SparkテーブルではDatetime型が利用できない。

%%spark
val df = spark.read.sqlanalytics("newpool.dbo.logdata")  # 専用SQLプールのテーブルをもとにデータフレームを作成
df.write.mode("overwrite").saveAsTable("logdatainternal") # Sparkテーブルとして書き込み(上書きモード)

// Then we can reference the table via SQL commands

%%sql
SELECT * FROM logdatainternal # Sparkテーブルに対してSQLクエリを実行

次のようにSparkデータベースを作成し、そこにSparkテーブルを作成することもできる。

%%sql
CREATE DATABASE internaldb
CREATE TABLE internaldb.customer(Id int,name varchar(200)) USING Parquet

%%sql
INSERT INTO internaldb.customer VALUES(1,'UserA')

%%sql
SELECT * FROM internaldb.customer


// If you want to load data from the log.csv file and then save to a table
%%pyspark
df = spark.read.load('abfss://data@datalake2000.dfs.core.windows.net/raw/Log.csv', format='csv'
, header=True
)
df.write.mode("overwrite").saveAsTable("internaldb.logdatanew")

%%sql
SELECT * FROM internaldb.logdatanew

Azure Databricks

Databricksはクラウドサービス。AWSやAzureでも利用できる。

ワークスペースとして、Databricksクラスターを作成する。

クラスターにはSparkエンジンや他のコンポーネントがインストールされている。

  • Interactive Cluster
    • ノートブックを利用してインタラクティブにデータ分析が行える。 
    • Standard ClusterとHigh Concurrency Clusterがある
  • Job Cluster

    • ジョブを渡して実行させる。
    • ジョブが完了したら、ターミネートされる。
  • Standard Cluster

    • 障害などの影響が他のユーザーと分離されない
    • 単一のワークロードのみ実行するのに適する
    • Python、R、ScalaSQLに対応
    • インアクティブ状態が一定時間続くと、自動的に削除できる
    • 自動でスケーリングアウトできる(ワーカノードの数を増やす)
  • High Concurrency Cluster

    • 障害分離
    • 複数のユーザーでワークロードを実行するのに適する
    • Python、R、SQLに対応
    • テーブルアクセスコントロールに対応。PythonSQLに対してアクセス制御ができる
  • Single Node

    • ノードが1つだけ。
    • 1つのノードでドライバーノードとワーカノードを兼ねる

Databricks環境の構築

料金

  • Databricks Units(DBUs)とVMの数に応じて課金される。
  • 新しいクラスターでタスクを実行すると、タスクは Data Engineering (タスク) ワークロードとして扱われ、タスク ワークロードの料金が適用されます。
  • 既存の汎用クラスターでタスクを実行すると、タスクはデータ分析 (汎用) ワークロードとして扱われ、汎用ワークロードの料金が適用されます。

Databricks File System

  • オブジェクトストレージ上の抽象レイヤー
  • これを介して、オブジェクトストレージとやり取りできる
  • クラスターが削除されてもファイルは維持される
  • デフォルトのストレージロケーションはDBFSルートと呼べれる
  • 以下のDBFSルートが存在する
    • /FileStore:インポートされるファイルやライブラリなどを置く場所
    • /databricks-datasets:サンプルのパブリックデータセット
    • /user/hive/warehouse:Hiveテーブル用のデータやメタデータ
  • %fs lsでDBFS内のファイルをリスト表示できる
  • %fsがDBFS用のマジックコマンド

グラフなどの作成

Azure Databricksのノートブック上で、データフレームから簡単にグラフなどを作成できる。

https://learn.microsoft.com/ja-jp/azure/databricks/visualizations/

クラスターのピン留め

https://learn.microsoft.com/ja-jp/azure/databricks/clusters/clusters-manage#--pin-a-cluster

  • クラスターは、終了してから 30 日後に完全に削除される。
  • 終了後 30 日以上経過した後も汎用クラスターの構成を維持するために、クラスターをピン留めできる。

Azure Synapse への構造化ストリーミング書き込み

https://learn.microsoft.com/ja-jp/azure/databricks/getting-started/streaming

https://learn.microsoft.com/ja-jp/azure/databricks/structured-streaming/synapse

  • 1つめの参考サイトでは、構造化ストリーミングとは何かについて説明している。
  • Azure Databricksワークスペース内に受信しているストリーミングデータをDatabriksのインメモリテーブルに書き込んでいる。
  • 2つ目のサイトでは、なんらかの方法で受信したストリーミングデータをAzure Synapse AnalyticsのSQLテーブルに書き込んでいる。
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Azure Active Directory 資格情報パススルーを使用して Azure Data Lake Storage にアクセスする

https://learn.microsoft.com/ja-jp/azure/databricks/data-governance/credential-passthrough/adls-passthrough

  • Azure Databricks へのログインに使用したものと同じ Azure Active Directory (Azure AD) ID を使用して、Azure Databricks クラスターから Azure Data Lake Storage Gen2 (ADLS Gen2) に自動で認証することができます。
  • クラスターで Azure Data Lake Storage 資格情報パススルーを有効にすると、そのクラスターで実行するコマンドは、ストレージにアクセスするためのサービス プリンシパル資格情報を構成しなくても、Azure Data Lake Storage のデータの読み取りと書き込みができるようになります。
  • Azure Data Lake Storage資格情報パススルーは、Azure Data Lake Storage Gen1 と Gen2 でのみサポート。
  • Azure Databricks ワークスペースは、Premium プランでなければならない。

ファイル取込みに関するベストプラクティス

  • ファイルは、生のまま取り込んでRAWゾーンに保管する。その後、不要なデータを落としてFiltered Zondeに保管する。最後に変換/統合を行って、Curatedゾーンに保管する。
  • フォルダの階層は次のようにする。 /<ゾーン名>/<データソース名>/YYYY/MM/DD/<ファイル名>
  • Parquetなどの圧縮を用いる。
  • ファイルを分割し、複数のコンピュートノードに割り当てられるようにする。

Azure Monitor

  • メトリクスに対してアクショングループを作成して特定の条件を満たした場合にメールで通知を送ったりできる。
  • メトリクスに対してアラートを設定して異常を通知できる。

緩やかに変化するディメンション

https://learn.microsoft.com/ja-jp/power-bi/guidance/star-schema#slowly-changing-dimensions

https://en.wikipedia.org/wiki/Slowly_changing_dimension

  • SCD(Slowly Changes Dimensions)
  • 例えば、町の名前、製品の名前、税率などがある。

Type 0:変更を認めない

  • 属性は変更されることはなく、永続的な値を持つ

Type 1:上書き

  • 古いデータが新しいデータで上書きされるため、履歴データは追跡されない
  • 欠点は、変更の履歴が残らないこと。
  • 利点は、メンテナンスが容易であること。

変更前

顧客コード 顧客名 顧客本社都道府県
101 山下工務店 埼玉県

変更後(顧客本社都道府県を変更)

顧客コード 顧客名 顧客本社都道府県
101 山下工務店 東京都

Type 2:新しい行を追加

  • 代理キー(レコードの開始終了日)を使用し、履歴データを追跡する。
  • 無制限の履歴が保存される。
  • 他にもバージョン番号やフラグを使用する。
顧客コード 顧客名 顧客本社都道府県 開始日 終了日
101 山下工務店 埼玉県 2000-01-01T00:00:00 2004-12-22T00:00:00
101 山下工務店 東京都 2004-12-22T00:00:00 NULL

Type 3:新しい属性を追加

  • 列を追加して変更を追跡する。
  • 限られた履歴を保持する。
  • 「オリジナル」ではなく、「前回」とかでもOK。
顧客コード 顧客名 顧客本社都道府県_オリジナル 変更日 顧客本社都道府県_現在
101 山下工務店 埼玉県 2004-12-22T00:00:00 東京都

Type 4:履歴テーブルの追加

  • 1つのテーブルが現在のデータを保持。
  • 追加のテーブルが変更の記録を保持。

顧客テーブル

顧客コード 顧客名 顧客本社都道府県
101 山下工務店 東京都

顧客_履歴テーブル

顧客コード 顧客名 顧客本社都道府県 開始日
101 山下工務店 埼玉県 2000-01-01T00:00:00
101 山下工務店 東京都 2004-12-22T00:00:00

Type 5

  • タイプ1とタイプ4の組み合わせ
  • 詳細確認要

Type 6:複合アプローチ

  • タイプ1、2、3のアプローチを組み合わせたもの

Type 7:代理キーと自然キーの両方をファクト テーブルに配置

  • 詳細確認要

File Format

Parquet ファイル

https://learn.microsoft.com/ja-jp/azure/databricks/external-data/parquet

  • 分析ワークロードの大規模読み込みに最適化されている。

Avro ファイル

https://learn.microsoft.com/en-us/azure/databricks/external-data/avro

  • トランザクショナルワークロードの大規模書き込みに最適化されている。

Azure Information Protection

https://learn.microsoft.com/ja-jp/azure/information-protection/what-is-information-protection

  • Microsoft 365 の認証機能を利用して、ファイルや電子メールを保護するクラウドベースの暗号化サービス

Azure でのストリーム処理テクノロジの選択

https://learn.microsoft.com/ja-jp/azure/architecture/data-guide/technology-choices/stream-processing#general-capabilities

  • Javaでストリーミング処理開発を行いたい場合は、Azure DatabricksやSparkを利用する。