データウェアハウスのアーキテクチャ
Datalake → Data Warehouse → BI
- Datalake: Azure Storage Gen2
- Data Warehouse: Azure Synapse Analytics
- BI: PowerBI
Azure Storage Account
コンテナの種類が色々あるけど、データエンジニアリングに主に使うのはBlobコンテナ。
Blobコンテナだと、Gen2ファイルシステムに設定できる。
Azure Synapse AnalyticsではGen2ファイルシステムを用いる。
最低保存期間
- ホット アクセス層:特になし
- クール アクセス層:最低30日間は保存する必要がある。
- アーカイブ アクセス層:最低180日間は保存する必要がある。
アクセス方法
- SAS
- アクセスキー
Power BIからストレージアカウントへの接続することもできる。
Storage ExplorerはAzure Storage Account用のエクスプローラーツール。
Azure Data Lake Storage Gen2 のアクセス制御リスト
https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-access-control
- IAMとは別に、ユーザやグループに対してACLを設定できる。
- コンテナ、フォルダ、ファイル単位でACLを設定できる。
- Azureポータルでの設定方法はこちら。 https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-acl-azure-portal
- Azure Storage Explorerでの設定方法はこちら。 https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-explorer-acl
Azure Data Lake Storage Gen2 のクエリ アクセラレーション
https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-query-acceleration
- SDK(PythonやPowerShell等で利用可能)を使って、ストレージ内にあるCSV および JSON形式のファイルに対してクエリを実行し、不要な列と不要な行を省くことで、高速にデータを読み取ることができる。
- 特に設定は不要っぽくて、SDKを使うだけ(?)
- 1つのファイルのみ処理できる。JOINしたり、GROUP BYすることはできない。
- 通常の料金体系とは異なる。
- クエリアクセラレーションの使用方法はこちら。 https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python
ライフサイクル管理ルールの定義
https://learn.microsoft.com/ja-jp/azure/storage/blobs/lifecycle-management-overview#sample-rule
tierToCool
、tierToArchive
、delete
でクール、アーカイブ、削除に変更するタイミングを定義する。
{ "rules": [ { "enabled": true, "name": "sample-rule", "type": "Lifecycle", "definition": { "actions": { "version": { "delete": { "daysAfterCreationGreaterThan": 90 } }, "baseBlob": { "tierToCool": { "daysAfterModificationGreaterThan": 30 }, "tierToArchive": { "daysAfterModificationGreaterThan": 90, "daysAfterLastTierChangeGreaterThan": 7 }, "delete": { "daysAfterModificationGreaterThan": 2555 } } }, "filters": { "blobTypes": [ "blockBlob" ], "prefixMatch": [ "sample-container/blob1" ] } } } ] }
SAS
-
- Azure Blob Storage、Azure Queue Storage、Azure Table Storage、またはAzure Filesのいずれかのストレージ サービス内のリソースへのアクセス権をもつSAS
- https://learn.microsoft.com/ja-jp/rest/api/storageservices/create-service-sas
Azure Storage の冗長性
https://learn.microsoft.com/ja-jp/azure/storage/common/storage-redundancy
ファイル サイズにおけるベストプラクティス
https://learn.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-best-practices#file-size
- ファイル サイズを大きくすると、パフォーマンスが向上し、コストが削減されます。
- 一般的に、データを大きなサイズ (256 MB から 100 GB のサイズ) のファイルに整理するとパフォーマンスが向上します。
- ファイル サイズを大きくすると、トランザクションのコストを削減することもできます。 読み取りと書き込みの操作は 4 MB 単位で課金されるため、ファイルのサイズが 4 MB であるか、数 KB であるかにかかわらず、操作に対して課金されます。
マネージドID
https://learn.microsoft.com/ja-jp/azure/active-directory/managed-identities-azure-resources/overview
- AzureリソースでマネージドIDをイネーブルすると、リソースがAzure ADに登録され、IDが払いだされる。それがマネージドID。
- 自動で割り当てられたり、ユーザによって割り当てられたりする。
- マネージドIDを利用して、Azureリソースは他のリソースにアクセスできるようになる(IAM設定をした場合)
- 例えば、Azure Synapse Analyticsの専用SQLからBlobストレージにアクセスする場合に、SASやアクセスキーを利用することもできるが、Azure Synapse AnalyticsワークスペースのマネージドIDに対してIAMで権限を付与するほうがよりセキュア。
Azure SQL
Azure SQLというくくりで、2種類のサービスが存在する。
Azure SQL Database
デプロイオプション
- 単一データベース
- フル マネージドの分離されたデータベース
- エラスティックプール
- CPU やメモリなどのリソースの共有セットを含む単一データベースのコレクション
- Managed Instance
料金体系
- vCore
- ワークロードに応じて最適なメモリ、ストレージオプションを提供するモデル
- DSU: データベーストランザクションユニット
- ストレージの容量を固定し、定額料金で利用できるモデル
Azure SQL Managed Instance
スキップ
Transact-SQL
構文表記規則
[ ]
がどういう時に使用されるのかわからなかったので調べたが、上記ドキュメントを読む限り省略可能なので、見やすさのために記載しているのか(?)
GOコマンド
Transact-SQLでは、SELECTした時点では値が返ってこない。
GOコマンドを実行するとSELECTの結果が取得できる。GOコマンドの時点までが実行される。
GOコマンドは前回のGOコマンドを実行した位置以降から有効となる。前回のGOコマンド以前に変数を定義しても、前回のGOコマンド以降の行ではその変数は使えない。
Azure Synapse Analytics
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/overview-what-is
Azure Synapse Analyticsはこのサイトにある画像を貼り付ける。
以下のサービスを統合的に提供するサービス。
- Synapse Studio
- Synapse SQL (サーバーレス、専用)
- Data Factory
- Apache Spark for Azure Synapse
- Azure Data Lake Storage Gen2
- Azure Synapse Data Explorer
カスタマー マネージド キーを使用した二重暗号化
- ワークスペースはデフォルトでプラットフォーム マネージド キーにより暗号化されている。
- カスタマー マネージド キーで暗号化設定することもできる。
- まず、Azure Key Vaultでキーを生成する。
- Synapse Analyticsワークスペース作成時に、セキュリティタブで「カスタマー マネージド キーを使用して二重暗号化」を有効にする。
- Synapse Analyticsワークスペースが作成されている間に、Azure Key Vaultで生成したキーのアクセスポリシーを設定する Azure Synapse AnalyticsのマネージドIDに対して、"Get"、"Wrap"、"Unwrap" のアクセス許可を付与する。
Synapse SQL
このドキュメントにわかりやすくまとめられている。
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/overview-architecture
Synapse SQLでは、複数のノードにデータの計算処理を分散。
コンピューティングをストレージから切り離すことで、コンピューティングをスケーリングできる。
コンピューティングノードはすべてのユーザーデータをAzure Storage に保存し、並行クエリを実行する。
データは Azure Storage によって保存および管理されるため、ストレージの使用量が別途課金される。
Synapse SQLには以下の2種類が存在する。
サーバレスは、blobコンテナなどにファイルをおいて、それを外部テーブルとして読み出すので、データ自体は外部にある。 一方、専用SQLプールはサーバをたてて、そこにデータを置く。専用SQLプールはちゃんとユーザを作成して権限を与える必要がある。さらに、ワークロードグループやワークロードクラシファイアーなども出てくる。よくわからないので、確認要!
専用SQLプール
分散処理(ディストリビューション)
クエリリクエストは、コントロールノードで受付られる。コントロールノードは複数あるコンピュートノードにリクエストを振り分けて並行処理させる。SQLプールには複数のコンピュートノードが存在する(もちろん1つのみに設定することもできる)。
一方、データは60個のディストリビューションに分割して保持される。ディストリビューションはAzure Storageに保存される。
データが分割されているのでコンピュートノードが並行処理することができる。例えば、コンピュートノードとディストリビューションを1対1で対応させるといった感じで並行処理する。
さらにこの構成により、コンピュートとストレージを別々にスケールさせることができる。
均等に分散したデータを含むディストリビューション列を選択する
ディストリビューションの選択にあたっては次を参考にする。
- 多数の一意の値を含む。
- NULL がない、または少ししか NULL がない。
- 日付列ではない。
テーブルの種類
SQL poolのテーブルは3種類ある。
- ラウンドロビン:物理的に異なるストレージに順繰りにいれていく
- ハッシュテーブル:特定の列をハッシュ値に変換して、ハッシュ値ごとにストレージをわける
- レプリケイトテーブル:すべてのストレージにデータを入れる
デフォルトではラウンドロビンテーブルになっている。SSMSのテーブルアイコンでテーブルタイプを見分けることができる
DBCC PDW_SHOWSPACEUSED('テーブル名')で、テーブルの各レコードが保管されているノードIDを確認できる。
ラウンドロビンテーブルに対して、GROUP BY句で値ごとにグループ化しようとすると、同じ値でも別のノードに分散してしまっているため、データシャッフリングが必要となる(データをシャッフルして、同じ値を同じノードに集める)
WITH句でテーブルタイプを指定できる。
CREATE TABLE [dbo].SalesFact
WITH
(
DISTRIBUTION = HASH (CustomerID)
)
ツール
- SSMS (SQL Server Management Studio)
- PostgreSQLでいうところのpgAdminのようなツール。
パーティション
データをパーティションと呼ばれる小さなグループにしておく。
WHERE句でフィルターするときに、パーティションを使用すると読み込むデータを絞ることができ、高速化できる。
通常は日付でパーティションされる。
ディストリビューションとパーティションあたり、最低でも100万行は必要。
CREATE PARTITION FUNCTION myRangePF1 (datetime2(0)) AS RANGE RIGHT FOR VALUES ('2022-04-01', '2022-05-01', '2022-06-01') ; GO CREATE PARTITION SCHEME myRangePS1 AS PARTITION myRangePF1 ALL TO ('PRIMARY') ; GO CREATE TABLE dbo.PartitionTable (col1 datetime2(0) PRIMARY KEY, col2 char(10)) ON myRangePS1 (col1) ; GO
- LEFT 範囲は、間隔値がデータベース エンジンによって左から右への昇順で並べ替えられる場合に、境界値が境界値間隔の左側に属することを指定します。 つまり、境界の最大値がパーティション内に含まれます。
- RIGHT 範囲は、間隔値がデータベース エンジンによって左から右への昇順で並べ替えられる場合に、境界値が境界値間隔の右側に属することを指定します。 つまり、境界値が最も小さい値が各パーティションに含まれます。
パーティションの切り替え
CREATE TABLE [dbo].[FactInternetSales_20000101_20010101] WITH ( DISTRIBUTION = HASH([ProductKey]) , CLUSTERED COLUMNSTORE INDEX , PARTITION ( [OrderDateKey] RANGE RIGHT FOR VALUES (20000101,20010101 ) ) ) AS SELECT * FROM [dbo].[FactInternetSales_20000101] WHERE [OrderDateKey] >= 20000101 AND [OrderDateKey] < 20010101; ALTER TABLE dbo.FactInternetSales_20000101_20010101 SWITCH PARTITION 2 TO dbo.FactInternetSales PARTITION 2;
列志向
Azure SQL Databaseは行志向データベースだが、Azure Synapse AnalyticsのSQL Poolは列志向データベースになっている。
インデックス
クラスター化された列ストアインデックス
- デフォルトでは、列ストアインデックスが提供される
- データ型がvarchar(max)、nvarchar(max)、varbinary(max)の場合はクラスター化された列ストアインデックスは使用できない
- テーブルに付き、1つしかインデックスは作成できない
- インデックス作成時に特定の列を指定しない
CREATE TABLE myTable ( id int NOT NULL, lastName varchar(20), zipCode varchar(6) ) WITH ( CLUSTERED COLUMNSTORE INDEX );
クラスター化されたインデックス
- クラスター化された列ストアインデックスが作成できない場合は、こちら。
- テーブルに付き、1つしかインデックスは作成できない
- インデックス作成時に特定の列を指定する
CREATE TABLE myTable ( id int NOT NULL, lastName varchar(20), zipCode varchar(6) ) WITH ( CLUSTERED INDEX (id) );
非クラスターインデックス
CREATE INDEX zipCodeIndex ON myTable (zipCode);
ヒープテーブル
- キャッシュ上にテーブルを作成することで、データ書き込みと読み取りを高速化できる。
- 本番のテーブルにロードする前のデータ変換などに利用する。
- ヒープではクラスター化インデックスを使用できない。
- 非クラスター化インデックスを作成することができる。
CREATE TABLE <テーブル名> ( ... ) WITH ( HEAP )
古いバージョンの専用SQLプール(旧称 SQL DW)の暗号化
データマスキング
- 組み込みの関数で簡単にデータをマスキングできる。
- Eメールや電話番号などが含まれる列を自動で見つけ出してマスキングのレコメンドもしてくれる。
- 管理者権限をもっていると、マスキングが適用されない。
- 以下のようにSQLでもマスキングを追加できる。
- データマスクは、見た目上わからなくなってはいるが、暗号化はされていないことに注意。
ALTER COLUMN [Phone Number] ADD MASKED WITH (FUNCTION = 'partial(5,"XXXXXXX",0)'
監査
- 簡単にログアナリティクスやストレージに監査ログを貯めることができる機能
設定方法
- 対象のAzure Synapse Analyticsの左側メニューで、Security配下のAuditingを選択する。
- 監査機能をイネーブルする。
- Log Analytics等を選択し、保存先を指定する。
データの検出と分類
- どのテーブルにどのような機密データが保持されているかを登録しておくことができる。
- 「データの検出と分類」をクリックすると、自動でカラムを読み取って、センシティブな情報を保持するカラムを見つけてくれる。
- そのうえで、分類のレコメンデーションをしてくれる。
Azure AD認証で専用SQLへアクセス
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/active-directory-authentication
- Azure ADのアカウントで専用SQLにアクセスできる(SQL認証のアカウント/パスワードがいらない)
- SSMSの認証画面で「Azure Active Directory - Universal with MFA」を選択して、いつもAzureにアクセスしているアカウントを入力する。
- 事前に以下のように専用SQLでユーザーを作成し、ロールを割り当てておく必要がある。
CREATE USER [xxxx@xxxxxx.onmicrosoft.com] FROM EXTERNAL PROVIDER WITH DEFAULT_SCHEMA = dbo; CREATE ROLE [NewRole] GRANT SELECT ON SCHEMA [dbo] TO [NewRole] EXEC sp_addrolemember N'NewRole', N'xxxx@xxxxxx.onmicrosoft.com'
行レベルセキュリティ
- 簡単に言うと、WHERE句を利用して、クエリを実行するユーザの名前が含まれる行のみを返すようにする。
列レベルセキュリティ
- GRANT文で特定の列のみSELECTを許可することで、列レベルのセキュリティを実現する。
GRANT SELECT ON Membership(MemberID, FirstName, LastName, Phone, Email) TO TestUser;
マネージドIDを利用した外部テーブルの作成
- SASではなく、マネージドIDを利用して外部テーブルを作成することができる。
- 下記の
IDENTITY = 'Managed Identity'
というところで、マネージドID認証を指定している。 - Azure Synapse Analyticsワークスペースには、事前にストレージへのアクセス権限をIAMで付与しておく必要がある(必要であれば、ストレージのACLの権限も。)
CREATE DATABASE SCOPED CREDENTIAL SynapseWorkspaceIdentity WITH IDENTITY = 'Managed Identity' CREATE EXTERNAL DATA SOURCE log_data_managed WITH ( LOCATION = 'https://<storage_account>.dfs.core.windows.net/<container>/<path>' CREDENTIAL = SynapseWorkspaceIdentity, TYPE = HADOOP ) CREATE EXTERNAL FILE FORMAT TextFileFormatManaged WITH ( FORMAT_TYPE = DELIMETEDTEXT, FORMAT_OPTIONS ( FIELD_TERMINATOR = ',', FIRST_ROW=2 ) ) CREATE EXTERNAL TABLE logdatamanaged ( ... ) WITH ( LOCATION = 'data/Log.csv', DATA_SOURCE = log_data_managed, FILE_FORMAT = TextFileFormatManaged )
動的管理ビューを使用したワークロードの確認
結果セットのキャッシュ
- 結果セットのキャッシュをイネーブルすることができる。
- 自動的にクエリ結果をデータベース上にキャッシュする。
- 高速化できる。
- 結果セットのキャッシュが使用されない場合には、48時間毎に消去される。
- または、結果セットのキャッシュがいっぱいになったら削除される。
-- キャッシュがイネーブルされているデータベースを確認する -- キャッシュ ヒットの場合は 1、キャッシュ ミスの場合は 0、結果セットのキャッシュが使用されなかった理由については負の値が返される。 SELECT name, is_result_set_caching_on FROM sys.databases -- データベースのクエリ ストアを有効にする -- masterデータベース上で実行する。 -- https://learn.microsoft.com/ja-jp/sql/t-sql/statements/alter-database-transact-sql-set-options?toc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Ftoc.json&bc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Fbreadcrumb%2Ftoc.json&view=azure-sqldw-latest&preserve-view=true#enable-query-store-for-a-database ALTER DATABASE [database_name] SET QUERY_STORE = ON; -- データベースに対して結果セットのキャッシュを有効にする ALTER DATABASE [database_name] SET RESULT_SET_CACHING ON;
ワークロード管理
- ワークロード識別子を作成し、それをユーザーグループに割り当てることで、例えばETL処理を優先させて、アドホックな分析の優先度を下げるといったことができる。
- ワークロード識別子の作成方法はこちら。https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql-data-warehouse/quickstart-create-a-workload-classifier-portal
復元ポイントからの復元
- ユーザーは、この機能を使用して、大規模な変更の前後に、スナップショットを手動でトリガーしてデータ ウェアハウスの復元ポイントを作成できます。作成方法はこちら。 https://learn.microsoft.com/ja-jp/azure/synapse-analytics/backuprestore/sqlpool-create-restore-point
- 復元方法はこちら。 https://learn.microsoft.com/ja-jp/azure/synapse-analytics/backuprestore/restore-sql-pool#restore-an-existing-dedicated-sql-pool-through-the-azure-portal
復元ポイントのリテンション期間 復元ポイントのリテンション期間の詳細は次のとおりです。 - 専用 SQL プールは、7 日間の保有期間に達したときと、復元ポイントの合計数が少なくとも 42 個ある場合 (ユーザー定義と自動の両方を含む) に、復元ポイントを削除します。 - 専用 SQL プールが一時停止しているときはスナップショットは取得されません。 - 復元ポイントの経過時間は、復元ポイントが取得された時点 (SQL プールが一時停止された時点を含む) からのカレンダーの絶対日数によって測定されます。 - 任意の時点で、復元ポイントが 7 日間の保有期間に達していない限り、専用 SQL プールは最大 42 個のユーザー定義の復元ポイントまたは 42 個の自動復元ポイントを格納できることが保証されています - スナップショットが取得され、専用 SQL プールが 7 日間を超えて一時停止されてから再開した場合、復元ポイントの数が合計で 42 になるまで (ユーザー定義と自動の両方を含む)、復元ポイントが維持されます。
IDENTITY を使用して、Azure Synapse Analytics の専用 SQL プールで代理キーを作成する
- テーブルの代理キーは、各行の一意の識別子を持つ列
- IDENTITY プロパティを使うと、この目的を簡単かつ効果的に達成でき、読み込みのパフォーマンスが影響を受けることもありません。
CREATE TABLE dbo.T1 ( C1 INT IDENTITY(1,1) NOT NULL , C2 INT NULL ) WITH ( DISTRIBUTION = HASH(C2) , CLUSTERED COLUMNSTORE INDEX ) ;
- IDENTITY(1, 1)の最初の1は開始値。2番目の1は増分値。
- IDENTITYの詳しい説明はこちら。https://learn.microsoft.com/ja-jp/sql/t-sql/statements/create-table-transact-sql-identity-property?view=sql-server-ver16
Azure AD の ID で専用SQLに接続する。
Azure AD ID にマップされる包含ユーザーを作成する
- SQL Database で Azure Active Directory 認証を使用するには、Azure AD ID に基づく包含データベース ユーザーを使用する必要があります。
- 包含データベース ユーザーは、master データベース内にログインを持たず、データベースに関連付けられている Azure AD の ID にマップされます。
- Azure AD の ID には、個々のユーザー アカウントにもグループ アカウントにもなります。
CREATE USER [appName] FROM EXTERNAL PROVIDER;
Azure AD の ID で SSMS または SSDT を利用して接続する
- Azure AD の ID を使用して SQL Database に接続する。
キャッシュ ヒットと使用率
- キャッシュヒット率とキャッシュ使用率の関係についてまとめられている。
サーバレスSQLプール
OPENROWSET を使用して JSON ファイルを読み取る
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/query-json-files
- OPENROWSETで行区切りの JSON ファイルを読み取り、個別の行として返す。
select top 10 * from openrowset( bulk 'https://pandemicdatalake.blob.core.windows.net/public/curated/covid-19/ecdc_cases/latest/ecdc_cases.jsonl', format = 'csv', fieldterminator ='0x0b', fieldquote = '0x0b' ) with (doc nvarchar(max)) as rows
- OPENROWSET では、ファイルから 1 つのテキスト値が読み取られます。
- OPENROWSET では値が BulkColumn として返され、OPENJSON 関数に BulkColumn が渡されます。
- OPENJSON では、BulkColumn 配列内の JSON オブジェクトの配列が反復処理される。
外部テーブルの作成
CREATE EXTERNAL TABLE { database_name.schema_name.table_name | schema_name.table_name | table_name } ( <column_definition> [ ,...n ] ) WITH ( LOCATION = 'folder_or_filepath', DATA_SOURCE = external_data_source_name, FILE_FORMAT = external_file_format_name [, TABLE_OPTIONS = N'{"READ_OPTIONS":["ALLOW_INCONSISTENT_READS"]}' ] [, <reject_options> [ ,...n ] ] ) [;] <column_definition> ::= column_name <data_type> [ COLLATE collation_name ] <reject_options> ::= { | REJECT_TYPE = value, | REJECT_VALUE = reject_value, | REJECT_SAMPLE_VALUE = reject_sample_value, | REJECTED_ROW_LOCATION = '/REJECT_Directory' }
LOCATION = 'folder_or_filepath'
- Azure Blob Storage にある実際のデータのフォルダーまたはファイル パスとファイル名を指定
- Hadoop 外部テーブルとは異なり、ネイティブ外部テーブルでは、パスの最後に /** を指定しない限り、サブフォルダーが返されません。
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/query-folders-multiple-csv-files
- フォルダー内のすべてのファイルを読み取ったり、フォルダーを再帰的に読み取ったり、複数のフォルダーからファイルを読み取るといったこともできる。
- 外部テーブルに対しては、以下のデータ定義言語 (DDL) ステートメントのみを使用できます。
- 他のDDL(ALTER、TRUNCATE)はサポートされていない。
- データ操作言語 (DML) の削除、挿入、更新の操作もサポートされていない。
Azure Synapse Sparkプール の外部テーブル定義を同期する
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/sql/develop-storage-files-spark-tables
- Parquet または CSV に基づく、Azure Storage に配置された各 Spark 外部テーブルに対して、サーバーレス SQLプール データベースに外部テーブルが作成されます。
- そのため、Spark プールをシャットダウンしても、サーバーレス SQL プールから Spark 外部テーブルに対するクエリを引き続き実行できます。
PolyBase
- SQLサーバーからBlobストレージ等にある外部データを直接照会できる。
- Polybase では、1 MB 未満の行を読み込みます。1行が 1MB を超えることはできない。
Pivotによる縦持ちから横持ちへの変換
- PIVOT では、式内の 1 つの列にある複数の一意の値を出力内の複数の列に変えることにより、テーブル値式が行列変換されます。
- UNPIVOT 関係演算子の機能は PIVOT 関係演算子の逆で、テーブル値式の複数の列を列値に行列変換します。
- PIVOT の構文は、SELECT...CASE ステートメントを複雑に組み合わせて同じ操作を指定する場合よりも単純で読みやすくなります。
https://style.potepan.com/articles/25072.html
透過的な暗号化
- データベース、バックアップ、ログ ファイルを暗号化し、バックアップやログからデータに直接アクセスできないようにする。
- デフォルトで有効
Always Encrypted
- クレジット カード番号や国の識別番号などの機密データを保護するために設計された機能
- Always Encryptedを使用すると、クライアントはクライアント アプリケーション内の機密データを暗号化し、暗号化キーをデータベース エンジンに公開することはありません。
- これにより、データを所有し、それを表示できるユーザーと、データを管理するがアクセス権を持たない必要があるユーザー (オンプレミスのデータベース管理者、クラウド データベースオペレーター、またはその他の高い特権を持つ未承認のユーザー) を分離できます。
- その結果、Always Encryptedにより、顧客は自信を持って機密データをクラウドに格納し、悪意のある内部関係者によるデータ盗難の可能性を減らすことができます。
決定論的暗号化とランダム暗号化
決定論的暗号化
- 指定されたプレーンテキスト値に対して常に同じ暗号化値が生成される。
- 暗号化された列で結合、グループ化、インデックス作成を行うことができる。
暗号化をランダム化
- ランダム化された暗号化は安全性が上がる。
- 暗号化された列に対して検索、グループ化、インデックス作成、結合ができない。
Azure Synapse AnalyticsワークスペースのIPアドレス制限
https://learn.microsoft.com/ja-jp/azure/synapse-analytics/security/synapse-workspace-ip-firewall
- Azure Synapse Analyticsワークスペースへのアクセス元IPを制限できる。
- ワークスペースの左側メニュー「ネットワーク」でIPアドレスを指定できる。
- 「Azure サービスおよびリソースに、このワークスペースへのアクセスを許可する」というオプションもある。
Data Factory
- データ変換を行うサービス
構成
メインとなるコンポーネントは以下の通り。詳細は次のリンクを参照のこと。
https://learn.microsoft.com/ja-jp/azure/data-factory/introduction#top-level-concepts
- Linked Service:データソースからデータを取り込む(Ingest)ため、あるいはデータデスティネーションへデータをロードするための接続文字情報を保有する。
- Dataset:データ構造(Data Structure)を保持する。データそのものではなく、 データを参照する名前付きのビュー。
- Activity:変換ロジック
- Pipeline:複数のActivityを束ねる
- Integration Runtime:データ抽出、変換、ロードを行うコンピューティングインフラストラクチャー
- パイプライン実行:パイプラインを実行するインスタンス
Data Factory内で利用できるサービス
- Data Flow
- Power Query
統合ランライム
データ抽出、変換、ロードの処理は、コンピューティングはAzure統合ランタイム
で行う。
統合ランタイムはまったく別の用途にも用いられる。VMにあるデータをSynapse Analyticsに連携する場合は、VMにセルフホステッド統合ランタイム
をインストールする必要がある。でないと、VMにあるデータを連携できない。
種類
- Azure Integration Runtime(サーバーレス)
- Azure マネージド VNET 統合ランタイム
- セルフホステッド統合ランタイム
セルフホステッド統合ランタイムのセットアップ
まずデータファクトリの「管理」画面で、Intergration rutimes
を選択する。
Newをクリックし、セルフホステッド統合ランタイムをセットアップする。
すると、認証キーが表示されるので、VMにインストールしたセルフホステッド統合ランタイムに認証キーを入力する。
これでVMがデータファクトリに登録される。
データファクトリ側では、新しいデータセットでファイル
→ ファイルシステム
を選択する。
パイプラインの作成と実行
コピーアクティビティのケースでは、「データのコピー」アクティビティに加えて、コピー元とコピー先のリンクされたサービス(計2個)とデータセット(計2個)を作成する。
リンクされたサービスとして、例えばストレージアカウントの接続文字列とアカウントキーを登録する。データセットとして、例えばリンクされたサービス、ストレージのファイルパス、区切り記号、エスケープ文字などを登録する。
上記リソースを作成したら、実行前に「発行」することで正式に登録されて実行可能になる。
今すぐにパイプラインを実行する場合は、「トリガーの追加」から「今すぐトリガー」をクリックする。
パイプラインの実行状況は「Monitor」で確認できる。Monitorでは詳細な実行履歴、エラーメッセージを確認できる。
コピーアクティビティ
- Azure SQL databaseなどのDBを
ソース
として選択した場合、テーブルそのものを指定するのに加えて、クエリを記述して読み込むことができる。
コピーアクティビティにおけるファイル階層構造の保持
- Azure Data Lake Storageをシンクとして指定する場合に、copyBehaviorで、PreserveHierarchy、FlattenHierarchy、MergeFilesが選べる。
- PreserveHierarchy (既定値):ターゲット フォルダー内でファイル階層を保持します。 ソース フォルダーへのソース ファイルの相対パスはターゲット フォルダーへのターゲット ファイルの相対パスと同じになります。
- FlattenHierarchy:ソース フォルダーのすべてのファイルをターゲット フォルダーの第一レベルに配置します。 ターゲット ファイルは、自動生成された名前になります。
- MergeFiles:ソース フォルダーのすべてのファイルを 1 つのファイルにマージします。 ファイル名を指定した場合、マージされたファイル名は指定した名前になります。 それ以外は自動生成されたファイル名になります。
Get Metadataアクティビティ
BlobストレージやAzure SQL DatabaseのメタデータをData flow内で取得できる。どういったメタデータが取得できるかはリソースによって異なる。詳細は以下を参照。Get Metadataアクティビティでストレージ内のファイル名をすべて取得し、For Eachアクティビティで各ファイルに対して処理を行うといったことができる。
データフローアクティビティ
- コピーアクティビティでクエリを記述することでもデータ変換は可能だが、Data Flowを使用すると、クエリをハードコードせずに済む。
- コードを書かずにデータ変換できる。
- Apache Sparkクラスターが作成され、クラスター上で実行される。Apache Sparkクラスターの起動に時間がかかる。
- デバッグモードで変換結果を確認できる。
- デバッグモードでも時間単位で課金される。
- 数分しか使わなくても1時間分課金される。
- 単価はvCore当たり。最低でもvCoreが8個使用される。
代表的な変換
すべての「変換」が以下にリストアップされている。
https://learn.microsoft.com/ja-jp/azure/data-factory/data-flow-transformation-overview
- 派生列
- シンク
- 条件分割変換
- 条件に応じてデータを分割し、フローを分岐する
スキーマドリフト
"スキーマの誤差"のこと。コピーアクティビティのソースやシンクで、スキーマドリフトを許可することで、仮に列が追加や削除された場合であってもエラーにならずに誤差として扱うことができる。
マッピングでAuto mapping
がオンだと誤差の列もシンクに書き込まれる。
誤差列を参照するには派生列
で列名をつける必要がある。Columnを$$
、Expressionを$$
と設定することで、入力された列名をそのまま設定することができる。
データファクトリーにおけるGitの利用
Azure DevOpsのアカウントを作ってプロジェクトおよびリポジトリを作成し、データファクトリでManage
のGit Configuration
でAzure DevOpsのGitリポジトリを設定することで、データファクトリで作成したパイプラインなどを保存できる。
Azure DevOpsのAzure Pipelines Releasesを使用して、本番用に作成しておいたデータファクトリ環境に対して、デプロイを自動化できる。
料金
https://azure.microsoft.com/ja-jp/pricing/details/data-factory/data-pipeline/
- アクティビティ実行回数とその実行時間に基づいて課金される
- 統合ランタイムは3種類の構成があり、それぞれ料金が異なる
- 別途、Data Flowも課金される。 Data Flowクラスターの実行時間とデバッグ時間に対して課金される。
- 「Monitor」のパイプライン実行履歴で、どれくらい消費しているかがわかる。
増分コピー
- タンブリングウィンドウを使用することで、定期的に過去のデータを取り出すことができる。
- タンブリングウィンドウでは、開始日時はUTCとみなされるため、タイムゾーンの指定は不要。
- ファイル読み込み動作の指定(
File loading behavior
)で、Incremental load: LastModifiedDate
を選択する。
カスタマー マネージド キーを使用した 二重暗号化
https://learn.microsoft.com/ja-jp/azure/data-factory/enable-customer-managed-key
- デフォルトでは、ランダムに生成されてデータ ファクトリに一意に割り当てられる Microsoft 管理のキーで暗号化される。
- さらに、カスタマーマネージドキーを設定して二重に暗号化できる。
- Azure Key Vaultでキーを生成し、データファクトリに対して"取得"、"キーを折り返す"、"キーの折り返しを解除"のアクセス許可する
- データファクトリのManageからCustomer managed keyを選択し、カスタマーマネージドキーのURIを設定する。
Azure Monitorでの監視
https://learn.microsoft.com/ja-jp/azure/data-factory/monitor-using-azure-monitor
- Data Factory では、パイプライン実行データを 45 日間だけ格納します。 データをより長期間保持する場合は、Azure Monitor を使用してください。
- データファクトリで監視できるメトリックには次のようなものがある(PipelineFailedRunsなど) https://learn.microsoft.com/ja-jp/azure/data-factory/monitor-metrics-alerts
パイプラインに注釈をつける
- 注釈は、パイプライン、データセット、リンク サービス、トリガーに割り当てることができる静的な値
- [プロパティ] アイコンで [+ 新規] ボタンをクリックし、注釈に適切な名前を付けます。
- [監視] タブに移動すると、[パイプラインの実行] でこの注釈をフィルター処理できます。
- アクティビティ レベルで動的な値を監視することもできる。
Azure 統合ランタイムの作成
- デフォルトのAzure統合ランタイムが使用される。Azure IR の場所は自動解決されます。
- Azure統合ランタイムを自分で作成することもできる。例えば、リージョンを指定したい場合などに用いる。 https://learn.microsoft.com/ja-jp/azure/data-factory/create-azure-integration-runtime?tabs=data-factory#create-an-azure-ir-via-ui
子パイプラインでのエラー
- 親パイプラインから子パイプラインを呼び出し、子パイプラインがエラーとなった場合には、親パイプラインもエラーとして実行データに記録される。
オンプレとの連携
- オンプレからデータを取り込むにはLinked serviceを作成する。この際、パイプラインやアクティビティは使用しない。
Azure Data Factory と Azure Synapse Analytics でのバイナリ
https://learn.microsoft.com/ja-jp/azure/data-factory/format-binary
- バイナリ データセットは、Copy アクティビティ、GetMetadata アクティビティ、または Delete アクティビティで使用できます。
- バイナリ データセットを使用する場合、サービスではファイルの内容は解析されず、そのまま処理されます。そのため処理が早い。
Azure Event Hubs
ストーリーミング送受信プラットフォーム。Apache Kafkaみたいな感じ。
名前空間を作成して、そこにEvent Hub
を作成する。エンドポイントURLが払いだされるので、そこにデータを送信する。
いったん受信したイベントは設定した期限が経過されるまで削除されない。
チェックポイントを使用することで、一度読み出したイベントを再度読みださないようにできる。
複数のパーティションで並列して読み書きすることで、一度に処理できる量を増やしている。
Capture機能によるBlobストレージへの書き込み
Azure Event Hubs Capture を使用すると、Azure Stream Analyticsを使用しなくても、Event Hubs のストリーミング データをAzure Blob Storage
またはAzure Data Lake Storage Gen1
、Azure Data Lake Storage Gen2
に自動的に配信できる。
https://learn.microsoft.com/ja-jp/azure/event-hubs/event-hubs-capture-enable-through-portal
イベント ハブ にパーティションを動的に追加する
https://learn.microsoft.com/ja-jp/azure/event-hubs/dynamically-add-partitions
- パーティションの数は、イベント ハブの作成時に指定できます。
- イベント ハブを作成した後に、パーティションを追加することもできる。
- ※パーティションを動的に追加する方法は、Event Hubs の premium および専用レベルでのみ利用できます。
Azure Stream Analytics
ストリーミングデータに対して、リアルタイムにクエリを実行して出力できる。
読み取り開始時間をNowにすると、現在時刻後に受信したデータのみ取り出す。
読み取り開始時刻をカスタムに設定すると、過去のイベントも読み出しできる。
参照データを使用する
https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-use-reference-data
- ストリーミングデータに参照データをJOINできる。
- 参照データは、Azure Blob Storage および Azure SQL Databaseに配置する。
- 参照データ入力を定義する方法については下記を参照する。
https://learn.microsoft.com/ja-jp/azure/stream-analytics/sql-reference-data
ウィンドウ関数
タンブリング ウィンドウ
一定時間ごとに受信したイベントをグループ化する。
各ウィンドウ時間は重なりあわない。
例)10秒ごとに区切って、発生したイベントを知りたい。
SELECT System.Timestamp() as WindowEndTime, TimeZone, COUNT(*) AS Count FROM TwitterStream TIMESTAMP BY CreatedAt GROUP BY TimeZone, TumblingWindow(second,10)
ホッピング ウィンドウ
一定時間ごとに受信したイベントをグループ化する。
各ウィンドウ時間は重なり合う。
例)過去10秒間に発生したイベント数を5秒間隔に知りたい。
SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count FROM TwitterStream TIMESTAMP BY CreatedAt GROUP BY Topic, HoppingWindow(second,10,5)
スライディング ウィンドウ
イベントを受信したタイミングで一定時間さかのぼってグループ化する。
例)10秒間に3回以上イベントが発生したらアラートしたい。
SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count FROM TwitterStream TIMESTAMP BY CreatedAt GROUP BY Topic, SlidingWindow(second,10) HAVING COUNT(*) >=3
セッション ウィンドウ
最初のイベントが発生したときに開始されます。 最後にイベントが取り込まれてから指定されたタイムアウト期間内に別のイベントが発生した場合、ウィンドウはその新しいイベントを含むように拡張されます。 タイムアウト期間内にどのイベントも発生しなかった場合、ウィンドウはタイムアウト時に閉じられます。指定されたタイムアウト期間内にイベントが発生し続けた場合、セッション ウィンドウは最大期間に達するまで拡張し続けます。
例)5秒以上間隔が開いていないイベントの数を知りたい。
SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count FROM TwitterStream TIMESTAMP BY CreatedAt GROUP BY Topic, SessionWindow(second,5,10)
スナップショット ウィンドウ
まったく同じ時間(病態ン位)に受信したイベントをグループ化する。
例)まったく同じタイミングで発生したイベントを知りたい。
SELECT System.Timestamp() as WindowEndTime, Topic, COUNT(*) AS Count
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY Topic, System.Timestamp()
https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-window-functions
OVER句
LAG/LAST句
https://learn.microsoft.com/ja-jp/stream-analytics-query/lag-azure-stream-analytics
https://learn.microsoft.com/ja-jp/stream-analytics-query/last-azure-stream-analytics
- LAG 分析演算子を使用すると、特定の制約内でイベント ストリーム内の "前の" イベントを参照できる。
LAG(<scalar_expression >, [<offset >], [<default>]) OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
例:以前の null 以外のセンサーの読み取り値を見つける
SELECT sensorId, LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL) FROM input
UDF
JavaScriptでユーザー定義関数を作成して、クエリで利用することができる。
イベントの間隔を検出する
- イベントの間隔を検出するためのクエリ
- LAST関数
Azure Stream Analyticsのメトリック
https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-job-metrics
- 上記のメトリックを監視できる。
- 特に以下のメトリックが重要。
- バックログされた入力イベント
- バックログされた入力イベントの数。 このメトリックの 0 以外の値は、ジョブが受信イベントの数に追いつかないことを意味します。 この値がゆっくり増加する場合や一貫して 0 以外である場合は、ジョブをスケールアウトする必要があります。 詳細については、「ストリーミング ユニットの理解と調整」を参照してください。
- データ変換エラー
- 初期入力イベント
- アプリケーション タイム スタンプが受信時間より 5 分以上早いイベント。
- 遅延入力イベント
- 構成済みの到着遅延許容期間より後に到着したイベント。 詳細については Azure Stream Analytics のイベントの順序に関する考慮事項を確認してください。
- 順不同のイベント
- イベント順序ポリシーに基づいて、削除された、または調整されたタイムスタンプが付与された、順不同で受信したイベントの数。 このメトリックは、順不同の許容範囲ウィンドウ 設定の構成によって影響を受ける可能性があります。
ストリーミングユニット
- ストリーミングユニットの使用率が100%に達すると、ジョブが失敗し始める
- 常にストリーミングユニットの使用率を監視する必要がある
ストリーミングユニットに関するベストプラクティス
https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-streaming-unit-consumption
- PARTITION BY を使用しないクエリでは、最低6つのストリーミングユニットを設定する。
- 必要以上のストリーミングユニットを割り当てる必要がある。
イベント順序ポリシーの作成
https://learn.microsoft.com/ja-jp/azure/stream-analytics/event-ordering
- イベント/アプリケーション時間は、イベント ペイロードに含まれるタイムスタンプです (イベントが生成された時間)
- 到着時刻は、イベントが入力ソース (イベント ハブ/IoT ハブ/Blob ストレージ) に到達した時間のタイムスタンプです。
- 既定では、Stream Analytics は到着時間でイベントを処理しますが、クエリで TIMESTAMP BY 節を使用することによって、イベント時間でイベントを処理することもできます。
- 遅延着信と順不同のポリシーは、イベント時間でイベントを処理する場合にのみ適用できます
- 遅延到着ポリシー
- さまざまな理由によりイベントが遅れて到着することがあります。 たとえば、40 秒遅れて到着したイベントのイベント時間が 00:10:00 である場合、到着時間は 00:10:40 になります。 遅延到着ポリシーを 15 秒に設定した場合、15 秒よりも遅く到着するイベントは、削除されるか (Stream Analytics によっては処理されない)、そのイベント時間が調整されます。 上記の例では、イベントは 40 秒遅れて (ポリシー設定よりも遅れて) 到着したため、イベント時間は遅延到着ポリシーの最大値に合わせて 00:10:25 (到着時間 - 遅延到着ポリシーの値) に調整されます。 既定の遅延到着ポリシーは 5 秒です。
- 順不同ポリシー
- イベントは、順不同で到着する場合もあります。 イベント時間を遅延到着ポリシーに基づいて調整した後、順不同のイベントを自動的に削除または調整することもできます。 このポリシーを 8 秒に設定した場合、順不同であるものの、8 秒以内到着したイベントは、イベント時間で並べ替えられます。 設定した時間を超えたイベントは、削除されるか、順不同ポリシーの最大値に調整されます。 既定の順不同ポリシーは 0 秒です。
Azure Stream Analytics での時間の処理について
https://learn.microsoft.com/ja-jp/azure/stream-analytics/stream-analytics-time-handling
- 基準値
- イベントがどのポイントまでストリーミング プロセッサで受信されているかを示すイベント時間マーカーです。
- 到着遅延イベント
- 到着遅延許容期間の定義により、Azure Stream Analytics では、受信イベントごとにイベント時間が到着時間と比較されます。 イベント時間が許容期間外の場合は、イベントを削除するようにシステムを構成するか、イベントの時間を許容期間内になるように調整することができます。基準値が生成されると、サービスは基準値よりも低いイベント時間でイベントを受信する可能性があります。 これらのイベントを破棄するか、イベントの時間を基準値に調整するようにサービスを構成することができます。
- 早期到着イベント
- 到着遅延許容値ウィンドウの反対のような、早期到着ウィンドウと呼ばれるもう 1 つの概念にお気づきかもしれません。 このウィンドウは 5 分に固定されており、到着遅延許容値ウィンドウとは異なる目的を果たします。Azure Stream Analytics は完全な結果を保証しているため、ジョブの最初の出力時刻 (入力時刻ではなく) として、ジョブの開始時刻を指定するだけです。 ジョブの開始時刻は、完全なウィンドウ (ウィンドウの途中からだけでなく) が処理されるようにするために必要です。
ジョブの停止を監視
- Runtime Errorメトリックでは監視できない。Runtime Errorメトリックは、ジョブがデータを受信できるけれどクエリの処理がエラーになっている場合にのみ適用される。
- All Administrative operationsで監視できる。All Administrative operationの失敗ステータスを監視することにより、ジョブが予期せず停止した場合にアラートが発行される。
Azure IoT Hub
Azure IoT HubにIoTデバイスを登録すると、接続文字列が生成されるので、それをデバイス側で設定する。
デバイスがストリームデータをIoT Hubに送ってくるので、Azure Stream Analyticsを使ってクエリを実行してデータを取り出す。
Scala
- 型付き言語
- オブジェクト指向であり、関数型プログラミング言語
- コンパイルして、Javaのバイトコードになる。
- Java Runtime上で動く
- REPL (Read Evaluate Print and Loop)というコマンドラインユーティリティを使用する
インストール
以下からダウンロードして、インストールする。
https://docs.scala-lang.org/getting-started/index.html
以下のコマンドでscalaの対話モードに入れる。
> scala3 Welcome to Scala 3.2.2 (19, Java Java HotSpot(TM) 64-Bit Server VM). Type in expressions for evaluation. Or try :help. scala> 3 val res0: Int = 3 scala> 4 val res1: Int = 4 scala> res0 + res1 val res2: Int = 7
var と val
varはmutableな変数。
valはimutableな変数(Javaでいうと、const付きの変数)。
varはvariableの略で、valはvalueの略。
valに再代入しようとすると、reassignment to val
というエラーになる。
val
構文
- if 構文
var i = 8 if (i < 10) { println("i is less than 10") } else { println("i is more than and equal to 10") }
- for 構文
for (i<-1 to 10) println("the value is " + i)
- while 構文
var i = 0 while (i < 10) { println("the value is " + i) i += 1 }
- case 構文
val i = 29 i match{ case i if i<30 => println("i is less than 30") case i if i>=30 => println("i is more than and equal to 30") }
- 関数
def add(x:Int, y:Int): Int = { return x + y } val x = 2 val y = 1 println("x + y is " + add(x, y))
- リスト
var numbers:List[Int] = List(0, 1, 2, 3, 4) println("The number is " + numbers.head) // The number is 0 numbers.foreach{println} // 1 2 3 4 println("The number is " + numbers(2)) // The number is 1 var names:List[String] = List("Yamada", "Nishida", "Hanada") names.foreach(println) // Yamada Nishida Hanada println("The index is " + names.indexOf("Hanada")) // The index is 2
Azure Synapse Spark Pool
Azure Synapse AnalyticsのSpark poolでSparkクラスターを動かすことができる。
DriverノードがSparkジョブを受付け、それをExecutorノードに振り分ける。
料金
仮想コア当たりの分単位課金。
Sparkプールはサーバレスなので、Sparkプールを作成しただけでは料金が発生しない。
Sparkジョブをサブミットしてはじめてノードが作成されて課金される。
最低ノード数量は3個で、1つはDriverノードになる。
Sparkでは、実行されるべきタイミングまでコードを実行しない。
開発
Azure Synapse AnalyticsでApache Sparkプールを作成する。
次に、Azure Synapse AnalyticsのSynapse Studioを開き、Develop
でノートブックを作成する。
ノートブックに作成済みのSpark poolを割り当てる。
言語でSpark (Scala)
を選択する。
RDD (Resilient Deistributed Dateset)
次のようにsc.parallelize(data)
とすることで、並列分散処理のため、データを各ノードに分散して配置できる。
val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data)
Spark データセット
並列で変換処理ができる。
変換処理により、新しいデータセットが作られる。
val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data) val ds
Spark データフレーム
Sparkデータフレームは、Sparkデータセットに列名をつけたもの。
外部ファイルからデータフレームを作成できる。
SparkデータフレームはScala、Java、Python、Rで処理できる。
val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data) val df = rdd.toDF() // データフレームに変換
RDD、データフレーム、データセットの比較についてはこちらを参照する。
https://yubessy.hatenablog.com/entry/2016/12/11/095915
列のソート
from pyspark.sql.functions import desc from pyspark.sql.functions import col sortted_df = df.sort(col("列名").desc()) display(sorted_df)
データのロード
Storege Gen2に配置されたLog.csv
ファイルをSparkデータフレームとしてロードする。
from pyspark.sql import SparkSession from pyspark.sql.types import * account_name = "datalake2000" container_name = "data" relative_path = "raw" adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path) spark.conf.set("fs.azure.account.auth.type.%s.dfs.core.windows.net" %account_name, "SharedKey") spark.conf.set("fs.azure.account.key.%s.dfs.core.windows.net" %account_name ,"V0bi0fr1nxs3Ox4fbPUGNDtE5XlGqvYT9tJJt0hkYS2ncXmiJtcW5DO2OLzffWKLQ410oITK3Ra7xN9Qjn1hhA==") df1 = spark.read.option('header', 'true') \ .option('delimiter', ',') \ .csv(adls_path + '/Log.csv') display(df1)
SQLの使用
Sparkデータフレームをもとにビューを作成して、それに対してSQLクエリを実行できる。
df1.createOrReplaceTempView("logdata") # データフレームからビューを作成 sql_1=spark.sql("SELECT Operationname, count(Operationname) FROM logdata GROUP BY Operationname") # ビューに対してSELECT分をクエリ sql_1.show() # または df1.createOrReplaceTempView("logdata") # データフレームからビューを作成 %%sql # Jupyterノートブックのマジックコマンド。このようにマジックコマンドを指定することで、次のSQLクエリをSQL言語で実行できる。 SELECT Operationname, count(Operationname) FROM logdata GROUP BY Operationname
SparkプールからSQL専用プールへのロード
事前に専用SQLプールにテーブルが作成されているとエラーになる。
%%Spark # SparkというマジックコマンドでScala言語を使用できる import com.microsoft.spark.sqlanalytics.utils.Constants import org.apache.spark.sql.SqlAnalyticsConnector._ # データスキーマの設定 val dataSchema = StructType(Array( StructField("Id", IntegerType, true), StructField("Correlationid", StringType, true), StructField("Operationname", StringType, true), StructField("Status", StringType, true), StructField("Eventcategory", StringType, true), StructField("Level", StringType, true), StructField("Time", TimestampType, true), StructField("Subscription", StringType, true), StructField("Eventinitiatedby", StringType, true), StructField("Resourcetype", StringType, true), StructField("Resourcegroup", StringType, true))) val df = spark.read.format("csv").option("header","true").schema(dataSchema).load("abfss://data@datalake2000.dfs.core.windows.net/raw/Log.csv") # Synapse Analytics内のDatalake Gen2を使用しているのでアクセスキーが不要 df.printSchema() # データフレームを専用SQLプールにテーブルとしてロードする df.write.sqlanalytics("<DBName>.<Schema>.<TableName>", <TableType>)
Sparkテーブルの作成
Sparkプールのメタストア上に、Sparkテーブルを作成することができる。
分析用の一時的なテーブルとして使用すること。Sparkプールが停止すると消えてしまう。
利点として、サーバレスSQLプールでSparkテーブルを共有して利用できる。
注意点:SparkテーブルではDatetime型が利用できない。
%%spark val df = spark.read.sqlanalytics("newpool.dbo.logdata") # 専用SQLプールのテーブルをもとにデータフレームを作成 df.write.mode("overwrite").saveAsTable("logdatainternal") # Sparkテーブルとして書き込み(上書きモード) // Then we can reference the table via SQL commands %%sql SELECT * FROM logdatainternal # Sparkテーブルに対してSQLクエリを実行
次のようにSparkデータベースを作成し、そこにSparkテーブルを作成することもできる。
%%sql CREATE DATABASE internaldb CREATE TABLE internaldb.customer(Id int,name varchar(200)) USING Parquet %%sql INSERT INTO internaldb.customer VALUES(1,'UserA') %%sql SELECT * FROM internaldb.customer // If you want to load data from the log.csv file and then save to a table %%pyspark df = spark.read.load('abfss://data@datalake2000.dfs.core.windows.net/raw/Log.csv', format='csv' , header=True ) df.write.mode("overwrite").saveAsTable("internaldb.logdatanew") %%sql SELECT * FROM internaldb.logdatanew
Azure Databricks
Databricksはクラウドサービス。AWSやAzureでも利用できる。
ワークスペースとして、Databricksクラスターを作成する。
クラスターにはSparkエンジンや他のコンポーネントがインストールされている。
- Interactive Cluster
- ノートブックを利用してインタラクティブにデータ分析が行える。
- Standard ClusterとHigh Concurrency Clusterがある
Job Cluster
- ジョブを渡して実行させる。
- ジョブが完了したら、ターミネートされる。
Standard Cluster
High Concurrency Cluster
Single Node
- ノードが1つだけ。
- 1つのノードでドライバーノードとワーカノードを兼ねる
Databricks環境の構築
料金
- Databricks Units(DBUs)とVMの数に応じて課金される。
- 新しいクラスターでタスクを実行すると、タスクは Data Engineering (タスク) ワークロードとして扱われ、タスク ワークロードの料金が適用されます。
- 既存の汎用クラスターでタスクを実行すると、タスクはデータ分析 (汎用) ワークロードとして扱われ、汎用ワークロードの料金が適用されます。
Databricks File System
- オブジェクトストレージ上の抽象レイヤー
- これを介して、オブジェクトストレージとやり取りできる
- クラスターが削除されてもファイルは維持される
- デフォルトのストレージロケーションはDBFSルートと呼べれる
- 以下のDBFSルートが存在する
%fs ls
でDBFS内のファイルをリスト表示できる%fs
がDBFS用のマジックコマンド
グラフなどの作成
Azure Databricksのノートブック上で、データフレームから簡単にグラフなどを作成できる。
https://learn.microsoft.com/ja-jp/azure/databricks/visualizations/
クラスターのピン留め
https://learn.microsoft.com/ja-jp/azure/databricks/clusters/clusters-manage#--pin-a-cluster
Azure Synapse への構造化ストリーミング書き込み
https://learn.microsoft.com/ja-jp/azure/databricks/getting-started/streaming
https://learn.microsoft.com/ja-jp/azure/databricks/structured-streaming/synapse
- 1つめの参考サイトでは、構造化ストリーミングとは何かについて説明している。
- Azure Databricksワークスペース内に受信しているストリーミングデータをDatabriksのインメモリテーブルに書き込んでいる。
- 2つ目のサイトでは、なんらかの方法で受信したストリーミングデータをAzure Synapse AnalyticsのSQLテーブルに書き込んでいる。
# Set up the Blob storage account access key in the notebook session conf. spark.conf.set( "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>") # Prepare streaming source; this could be Kafka or a simple rate stream. df = spark.readStream \ .format("rate") \ .option("rowsPerSecond", "100000") \ .option("numPartitions", "16") \ .load() # Apply some transformations to the data then use # Structured Streaming API to continuously write the data to a table in Azure Synapse. df.writeStream \ .format("com.databricks.spark.sqldw") \ .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \ .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \ .option("forwardSparkAzureStorageCredentials", "true") \ .option("dbTable", "<your-table-name>") \ .option("checkpointLocation", "/tmp_checkpoint_location") \ .start()
Azure Active Directory 資格情報パススルーを使用して Azure Data Lake Storage にアクセスする
- Azure Databricks へのログインに使用したものと同じ Azure Active Directory (Azure AD) ID を使用して、Azure Databricks クラスターから Azure Data Lake Storage Gen2 (ADLS Gen2) に自動で認証することができます。
- クラスターで Azure Data Lake Storage 資格情報パススルーを有効にすると、そのクラスターで実行するコマンドは、ストレージにアクセスするためのサービス プリンシパル資格情報を構成しなくても、Azure Data Lake Storage のデータの読み取りと書き込みができるようになります。
- Azure Data Lake Storage資格情報パススルーは、Azure Data Lake Storage Gen1 と Gen2 でのみサポート。
- Azure Databricks ワークスペースは、Premium プランでなければならない。
ファイル取込みに関するベストプラクティス
- ファイルは、生のまま取り込んでRAWゾーンに保管する。その後、不要なデータを落としてFiltered Zondeに保管する。最後に変換/統合を行って、Curatedゾーンに保管する。
- フォルダの階層は次のようにする。 /<ゾーン名>/<データソース名>/YYYY/MM/DD/<ファイル名>
- Parquetなどの圧縮を用いる。
- ファイルを分割し、複数のコンピュートノードに割り当てられるようにする。
Azure Monitor
- メトリクスに対してアクショングループを作成して特定の条件を満たした場合にメールで通知を送ったりできる。
- メトリクスに対してアラートを設定して異常を通知できる。
緩やかに変化するディメンション
https://learn.microsoft.com/ja-jp/power-bi/guidance/star-schema#slowly-changing-dimensions
https://en.wikipedia.org/wiki/Slowly_changing_dimension
- SCD(Slowly Changes Dimensions)
- 例えば、町の名前、製品の名前、税率などがある。
Type 0:変更を認めない
- 属性は変更されることはなく、永続的な値を持つ
Type 1:上書き
- 古いデータが新しいデータで上書きされるため、履歴データは追跡されない
- 欠点は、変更の履歴が残らないこと。
- 利点は、メンテナンスが容易であること。
変更前
顧客コード | 顧客名 | 顧客本社都道府県 |
---|---|---|
101 | 山下工務店 | 埼玉県 |
変更後(顧客本社都道府県を変更)
顧客コード | 顧客名 | 顧客本社都道府県 |
---|---|---|
101 | 山下工務店 | 東京都 |
Type 2:新しい行を追加
- 代理キー(レコードの開始終了日)を使用し、履歴データを追跡する。
- 無制限の履歴が保存される。
- 他にもバージョン番号やフラグを使用する。
顧客コード | 顧客名 | 顧客本社都道府県 | 開始日 | 終了日 |
---|---|---|---|---|
101 | 山下工務店 | 埼玉県 | 2000-01-01T00:00:00 | 2004-12-22T00:00:00 |
101 | 山下工務店 | 東京都 | 2004-12-22T00:00:00 | NULL |
Type 3:新しい属性を追加
- 列を追加して変更を追跡する。
- 限られた履歴を保持する。
- 「オリジナル」ではなく、「前回」とかでもOK。
顧客コード | 顧客名 | 顧客本社都道府県_オリジナル | 変更日 | 顧客本社都道府県_現在 |
---|---|---|---|---|
101 | 山下工務店 | 埼玉県 | 2004-12-22T00:00:00 | 東京都 |
Type 4:履歴テーブルの追加
- 1つのテーブルが現在のデータを保持。
- 追加のテーブルが変更の記録を保持。
顧客テーブル
顧客コード | 顧客名 | 顧客本社都道府県 |
---|---|---|
101 | 山下工務店 | 東京都 |
顧客_履歴テーブル
顧客コード | 顧客名 | 顧客本社都道府県 | 開始日 |
---|---|---|---|
101 | 山下工務店 | 埼玉県 | 2000-01-01T00:00:00 |
101 | 山下工務店 | 東京都 | 2004-12-22T00:00:00 |
Type 5
- タイプ1とタイプ4の組み合わせ
- 詳細確認要
Type 6:複合アプローチ
- タイプ1、2、3のアプローチを組み合わせたもの
Type 7:代理キーと自然キーの両方をファクト テーブルに配置
- 詳細確認要
File Format
Parquet ファイル
https://learn.microsoft.com/ja-jp/azure/databricks/external-data/parquet
- 分析ワークロードの大規模読み込みに最適化されている。
Avro ファイル
https://learn.microsoft.com/en-us/azure/databricks/external-data/avro
- トランザクショナルワークロードの大規模書き込みに最適化されている。
Azure Information Protection
https://learn.microsoft.com/ja-jp/azure/information-protection/what-is-information-protection
Azure でのストリーム処理テクノロジの選択
- Javaでストリーミング処理開発を行いたい場合は、Azure DatabricksやSparkを利用する。